Compare commits
No commits in common. "691cc2e3fe823a090c6b14d7ca587f8dca8d447c" and "7e0aa8169bd47838555335f6b7e4f385bac0b841" have entirely different histories.
691cc2e3fe
...
7e0aa8169b
@ -31,7 +31,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql",
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{F0288175-F0EC-48BD-945F-CF1512850943}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{919F4CDB-5C82-4371-B209-403B408DA248}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
|
||||||
EndProject
|
EndProject
|
||||||
@ -101,10 +101,6 @@ Global
|
|||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.Build.0 = Release|Any CPU
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
{F0288175-F0EC-48BD-945F-CF1512850943}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
|
||||||
{F0288175-F0EC-48BD-945F-CF1512850943}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
|
||||||
{F0288175-F0EC-48BD-945F-CF1512850943}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
|
||||||
{F0288175-F0EC-48BD-945F-CF1512850943}.Release|Any CPU.Build.0 = Release|Any CPU
|
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
@ -125,7 +121,6 @@ Global
|
|||||||
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{919F4CDB-5C82-4371-B209-403B408DA248} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{919F4CDB-5C82-4371-B209-403B408DA248} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
||||||
|
|||||||
@ -1,18 +1,6 @@
|
|||||||
using JiShe.CollectBus.Common.Attributes;
|
namespace JiShe.CollectBus.Samples;
|
||||||
using Volo.Abp.EventBus;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Samples;
|
|
||||||
|
|
||||||
[EventName("Sample.Kafka.Test")]
|
|
||||||
[TopicName("Test1")]
|
|
||||||
public class SampleDto
|
public class SampleDto
|
||||||
{
|
{
|
||||||
public int Value { get; set; }
|
public int Value { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
[EventName("Sample.Kafka.Test2")]
|
|
||||||
[TopicName("Test2")]
|
|
||||||
public class SampleDto2
|
|
||||||
{
|
|
||||||
public int Value { get; set; }
|
|
||||||
}
|
|
||||||
|
|||||||
@ -20,15 +20,12 @@ using AutoMapper.Configuration.Annotations;
|
|||||||
using JiShe.CollectBus.Common.Attributes;
|
using JiShe.CollectBus.Common.Attributes;
|
||||||
using JiShe.CollectBus.IoTDBProvider;
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using Confluent.Kafka.Admin;
|
using Confluent.Kafka.Admin;
|
||||||
|
using Confluent.Kafka;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using Thrift;
|
using Thrift;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Volo.Abp.EventBus.Kafka;
|
|
||||||
using Volo.Abp.Kafka;
|
|
||||||
using Volo.Abp.EventBus;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus;
|
namespace JiShe.CollectBus;
|
||||||
|
|
||||||
@ -39,66 +36,36 @@ namespace JiShe.CollectBus;
|
|||||||
typeof(AbpAutoMapperModule),
|
typeof(AbpAutoMapperModule),
|
||||||
typeof(AbpBackgroundWorkersHangfireModule),
|
typeof(AbpBackgroundWorkersHangfireModule),
|
||||||
typeof(CollectBusFreeRedisModule),
|
typeof(CollectBusFreeRedisModule),
|
||||||
typeof(CollectBusFreeSqlModule),
|
typeof(CollectBusIoTDBModule),
|
||||||
typeof(AbpEventBusModule),
|
typeof(CollectBusFreeSqlModule)
|
||||||
typeof(AbpKafkaModule),
|
|
||||||
typeof(CollectBusIoTDBModule)
|
|
||||||
)]
|
)]
|
||||||
public class CollectBusApplicationModule : AbpModule
|
public class CollectBusApplicationModule : AbpModule
|
||||||
{
|
{
|
||||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||||
{
|
{
|
||||||
var configuration = context.Services.GetConfiguration();
|
|
||||||
|
|
||||||
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
|
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
|
||||||
Configure<AbpAutoMapperOptions>(options =>
|
Configure<AbpAutoMapperOptions>(options =>
|
||||||
{
|
{
|
||||||
options.AddMaps<CollectBusApplicationModule>(validate: true);
|
options.AddMaps<CollectBusApplicationModule>(validate: true);
|
||||||
});
|
});
|
||||||
|
|
||||||
Configure<AbpKafkaOptions>(configuration.GetSection("Kafka"));
|
//Configure<AbpAntiForgeryOptions>(options =>
|
||||||
Configure<AbpKafkaEventBusOptions>(configuration.GetSection("Kafka:EventBus"));
|
//{
|
||||||
|
// options.TokenCookie.Expiration = TimeSpan.FromDays(365);
|
||||||
Configure<AbpKafkaOptions>(options =>
|
// options.AutoValidateIgnoredHttpMethods.Add("POST");
|
||||||
{
|
//});
|
||||||
options.ConfigureConsumer = config =>
|
|
||||||
{
|
|
||||||
config.GroupId = configuration.GetValue<string>("Kafka:Consumer:GroupId");
|
|
||||||
config.EnableAutoCommit = configuration.GetValue<bool>("Kafka:Consumer:EnableAutoCommit");
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
Configure<AbpKafkaOptions>(options =>
|
|
||||||
{
|
|
||||||
options.ConfigureProducer = config =>
|
|
||||||
{
|
|
||||||
config.MessageTimeoutMs = configuration.GetValue<int>("Kafka:Producer:MessageTimeoutMs");
|
|
||||||
config.Acks = (Acks)configuration.GetValue<int>("Kafka:Producer:Acks");
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
Configure<AbpKafkaOptions>(options =>
|
|
||||||
{
|
|
||||||
options.ConfigureTopic = specification =>
|
|
||||||
{
|
|
||||||
specification.ReplicationFactor = configuration.GetValue<short>("Kafka:Topic:ReplicationFactor");
|
|
||||||
specification.NumPartitions = configuration.GetValue<int>("Kafka:Topic:NumPartitions");
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void OnApplicationInitialization(
|
public override void OnApplicationInitialization(
|
||||||
ApplicationInitializationContext context)
|
ApplicationInitializationContext context)
|
||||||
{
|
{
|
||||||
context.ServiceProvider.GetRequiredService<CustomKafkaDistributedEventBus>().Initialize();
|
|
||||||
|
|
||||||
var assembly = Assembly.GetExecutingAssembly();
|
var assembly = Assembly.GetExecutingAssembly();
|
||||||
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
|
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
|
||||||
foreach (var type in types)
|
foreach (var type in types)
|
||||||
{
|
{
|
||||||
context.AddBackgroundWorkerAsync(type);
|
context.AddBackgroundWorkerAsync(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||||
|
|
||||||
//默认初始化表计信息
|
//默认初始化表计信息
|
||||||
|
|||||||
@ -1,251 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Reflection;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using Volo.Abp;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
using Volo.Abp.EventBus;
|
|
||||||
using Volo.Abp.EventBus.Distributed;
|
|
||||||
using Volo.Abp.EventBus.Kafka;
|
|
||||||
using Volo.Abp.EventBus.Local;
|
|
||||||
using Volo.Abp.Guids;
|
|
||||||
using Volo.Abp.Kafka;
|
|
||||||
using Volo.Abp.MultiTenancy;
|
|
||||||
using Volo.Abp.Timing;
|
|
||||||
using Volo.Abp.Tracing;
|
|
||||||
using Volo.Abp.Uow;
|
|
||||||
using Volo.Abp.Threading;
|
|
||||||
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus;
|
|
||||||
|
|
||||||
[Dependency(ReplaceServices = true)]
|
|
||||||
[ExposeServices(typeof(IDistributedEventBus), typeof(CustomKafkaDistributedEventBus))]
|
|
||||||
public class CustomKafkaDistributedEventBus : DistributedEventBusBase, ISingletonDependency
|
|
||||||
{
|
|
||||||
|
|
||||||
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
|
|
||||||
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; }
|
|
||||||
protected IKafkaSerializer Serializer { get; }
|
|
||||||
protected IProducerPool ProducerPool { get; }
|
|
||||||
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
|
|
||||||
protected ConcurrentDictionary<string, Type> EventTypes { get; }
|
|
||||||
protected IKafkaMessageConsumer Consumer { get; private set; } = default!;
|
|
||||||
|
|
||||||
|
|
||||||
public CustomKafkaDistributedEventBus(IServiceScopeFactory serviceScopeFactory,
|
|
||||||
ICurrentTenant currentTenant,
|
|
||||||
IUnitOfWorkManager unitOfWorkManager,
|
|
||||||
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
|
|
||||||
IGuidGenerator guidGenerator,
|
|
||||||
IClock clock,
|
|
||||||
IEventHandlerInvoker eventHandlerInvoker,
|
|
||||||
ILocalEventBus localEventBus,
|
|
||||||
ICorrelationIdProvider correlationIdProvider,
|
|
||||||
IOptions<AbpKafkaEventBusOptions> abpKafkaEventBusOptions,
|
|
||||||
IKafkaMessageConsumerFactory messageConsumerFactory,
|
|
||||||
IKafkaSerializer serializer,
|
|
||||||
IProducerPool producerPool)
|
|
||||||
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider)
|
|
||||||
{
|
|
||||||
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
|
|
||||||
MessageConsumerFactory = messageConsumerFactory;
|
|
||||||
Serializer = serializer;
|
|
||||||
ProducerPool = producerPool;
|
|
||||||
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
|
|
||||||
EventTypes = new ConcurrentDictionary<string, Type>();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void Initialize()
|
|
||||||
{
|
|
||||||
|
|
||||||
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
|
|
||||||
}
|
|
||||||
|
|
||||||
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
|
|
||||||
{
|
|
||||||
var handlerFactories = GetOrCreateHandlerFactories(eventType);
|
|
||||||
|
|
||||||
if (factory.IsInFactories(handlerFactories))
|
|
||||||
{
|
|
||||||
return NullDisposable.Instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
handlerFactories.Add(factory);
|
|
||||||
|
|
||||||
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
|
|
||||||
{
|
|
||||||
Check.NotNull(action, nameof(action));
|
|
||||||
|
|
||||||
GetOrCreateHandlerFactories(typeof(TEvent))
|
|
||||||
.Locking(factories =>
|
|
||||||
{
|
|
||||||
factories.RemoveAll(
|
|
||||||
factory =>
|
|
||||||
{
|
|
||||||
var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
|
|
||||||
if (singleInstanceFactory == null)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>;
|
|
||||||
if (actionHandler == null)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return actionHandler.Action == action;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void Unsubscribe(Type eventType, IEventHandler handler)
|
|
||||||
{
|
|
||||||
GetOrCreateHandlerFactories(eventType)
|
|
||||||
.Locking(factories =>
|
|
||||||
{
|
|
||||||
factories.RemoveAll(
|
|
||||||
factory =>
|
|
||||||
factory is SingleInstanceHandlerFactory handlerFactory &&
|
|
||||||
handlerFactory.HandlerInstance == handler
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
|
|
||||||
{
|
|
||||||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void UnsubscribeAll(Type eventType)
|
|
||||||
{
|
|
||||||
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
|
|
||||||
{
|
|
||||||
var headers = new Headers
|
|
||||||
{
|
|
||||||
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
|
|
||||||
};
|
|
||||||
|
|
||||||
if (CorrelationIdProvider.Get() != null)
|
|
||||||
{
|
|
||||||
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(CorrelationIdProvider.Get()!));
|
|
||||||
}
|
|
||||||
|
|
||||||
var topicAttribute = eventType.GetCustomAttribute<TopicNameAttribute>();
|
|
||||||
|
|
||||||
await PublishAsync(
|
|
||||||
topicAttribute==null?AbpKafkaEventBusOptions.TopicName:topicAttribute.Name,
|
|
||||||
eventType,
|
|
||||||
eventData,
|
|
||||||
headers
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
|
|
||||||
{
|
|
||||||
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
|
|
||||||
{
|
|
||||||
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
|
|
||||||
|
|
||||||
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
|
|
||||||
{
|
|
||||||
handlerFactoryList.Add(
|
|
||||||
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
|
|
||||||
}
|
|
||||||
|
|
||||||
return handlerFactoryList.ToArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override byte[] Serialize(object eventData)
|
|
||||||
{
|
|
||||||
return Serializer.Serialize(eventData);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
|
|
||||||
{
|
|
||||||
return HandlerFactories.GetOrAdd(
|
|
||||||
eventType,
|
|
||||||
type =>
|
|
||||||
{
|
|
||||||
var eventName = EventNameAttribute.GetNameOrDefault(type);
|
|
||||||
EventTypes.GetOrAdd(eventName, eventType);
|
|
||||||
return new List<IEventHandlerFactory>();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
|
|
||||||
{
|
|
||||||
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
|
|
||||||
var body = Serializer.Serialize(eventData);
|
|
||||||
|
|
||||||
return PublishAsync(topicName, eventName, body, headers);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Task<DeliveryResult<string, byte[]>> PublishAsync(
|
|
||||||
string topicName,
|
|
||||||
string eventName,
|
|
||||||
byte[] body,
|
|
||||||
Headers headers)
|
|
||||||
{
|
|
||||||
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
|
|
||||||
|
|
||||||
return producer.ProduceAsync(
|
|
||||||
topicName,
|
|
||||||
new Message<string, byte[]>
|
|
||||||
{
|
|
||||||
Key = eventName,
|
|
||||||
Value = body,
|
|
||||||
Headers = headers
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
|
|
||||||
{
|
|
||||||
//Should trigger same type
|
|
||||||
if (handlerEventType == targetEventType)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
//Should trigger for inherited types
|
|
||||||
if (handlerEventType.IsAssignableFrom(targetEventType))
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,46 +0,0 @@
|
|||||||
using JiShe.CollectBus.Samples;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
using Volo.Abp.EventBus.Distributed;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Handlers
|
|
||||||
{
|
|
||||||
public class SampleHandler : IDistributedEventHandler<SampleDto>,
|
|
||||||
ITransientDependency
|
|
||||||
{
|
|
||||||
|
|
||||||
private readonly ILogger<SampleHandler> _logger;
|
|
||||||
|
|
||||||
public SampleHandler(ILogger<SampleHandler> logger)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task HandleEventAsync(SampleDto eventData)
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"topic Test1 message: {eventData.Value.ToString()}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public class SampleHandler2 : IDistributedEventHandler<SampleDto2>,
|
|
||||||
ITransientDependency
|
|
||||||
{
|
|
||||||
|
|
||||||
private readonly ILogger<SampleHandler2> _logger;
|
|
||||||
|
|
||||||
public SampleHandler2(ILogger<SampleHandler2> logger)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task HandleEventAsync(SampleDto2 eventData)
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"topic Test2 message: {eventData.Value.ToString()}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -24,7 +24,6 @@
|
|||||||
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
||||||
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
||||||
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||||
|
|||||||
@ -1,23 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.EventBus;
|
|
||||||
using Volo.Abp;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.Attributes
|
|
||||||
{
|
|
||||||
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
|
|
||||||
public class TopicNameAttribute : Attribute
|
|
||||||
{
|
|
||||||
public virtual string Name { get; }
|
|
||||||
|
|
||||||
public TopicNameAttribute(string name)
|
|
||||||
{
|
|
||||||
this.Name = Check.NotNullOrWhiteSpace(name, nameof(name));
|
|
||||||
}
|
|
||||||
|
|
||||||
public string GetName(Type eventType) => this.Name;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,111 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Confluent.Kafka.Admin;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.AdminClient
|
|
||||||
{
|
|
||||||
public class AdminClientService : IAdminClientService, ISingletonDependency
|
|
||||||
{
|
|
||||||
|
|
||||||
private readonly ILogger<AdminClientService> _logger;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Initializes a new instance of the <see cref="AdminClientService"/> class.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="configuration">The configuration.</param>
|
|
||||||
/// <param name="logger">The logger.</param>
|
|
||||||
public AdminClientService(IConfiguration configuration, ILogger<AdminClientService> logger)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
GetInstance(configuration);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Gets or sets the instance.
|
|
||||||
/// </summary>
|
|
||||||
/// <value>
|
|
||||||
/// The instance.
|
|
||||||
/// </value>
|
|
||||||
public IAdminClient Instance { get; set; } = default;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Gets the instance.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="configuration">The configuration.</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public IAdminClient GetInstance(IConfiguration configuration)
|
|
||||||
{
|
|
||||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
|
||||||
var adminClientConfig = new AdminClientConfig()
|
|
||||||
{
|
|
||||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
|
||||||
};
|
|
||||||
if (enableAuthorization)
|
|
||||||
{
|
|
||||||
adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
|
||||||
adminClientConfig.SaslMechanism = SaslMechanism.Plain;
|
|
||||||
adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
|
||||||
adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
|
||||||
}
|
|
||||||
Instance = new AdminClientBuilder(adminClientConfig).Build();
|
|
||||||
return Instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Checks the topic asynchronous.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic">The topic.</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task<bool> CheckTopicAsync(string topic)
|
|
||||||
{
|
|
||||||
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
|
|
||||||
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Creates the topic if not exist asynchronous.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topicName">Name of the topic.</param>
|
|
||||||
/// <param name="factorNum">The factor number.</param>
|
|
||||||
/// <param name="partitionNum">The partition number.</param>
|
|
||||||
public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (await CheckTopicAsync(topicName)) return;
|
|
||||||
|
|
||||||
await Instance.CreateTopicsAsync(new[]
|
|
||||||
{
|
|
||||||
new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }
|
|
||||||
});
|
|
||||||
}
|
|
||||||
catch (CreateTopicsException e)
|
|
||||||
{
|
|
||||||
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Deletes the topic asynchronous.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topicName">Name of the topic.</param>
|
|
||||||
public async Task DeleteTopicAsync(List<string> topicName)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await Instance.DeleteTopicsAsync(topicName, null);
|
|
||||||
}
|
|
||||||
catch (DeleteTopicsException e)
|
|
||||||
{
|
|
||||||
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,34 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.AdminClient
|
|
||||||
{
|
|
||||||
public interface IAdminClientService
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Checks the topic asynchronous.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic">The topic.</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task<bool> CheckTopicAsync(string topic);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Creates the topic if not exist asynchronous.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topicName">Name of the topic.</param>
|
|
||||||
/// <param name="factorNum">The factor number.</param>
|
|
||||||
/// <param name="partitionNum">The partition number.</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Deletes the topic asynchronous.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topicName">Name of the topic.</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task DeleteTopicAsync(List<string> topicName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,29 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Attributes
|
|
||||||
{
|
|
||||||
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
|
|
||||||
public class TopicAttribute: Attribute
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Initializes a new instance of the <see cref="TopicAttribute"/> class.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="name">The name.</param>
|
|
||||||
public TopicAttribute(string name = "Default")
|
|
||||||
{
|
|
||||||
Name = name;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Gets or sets the name.
|
|
||||||
/// </summary>
|
|
||||||
/// <value>
|
|
||||||
/// The name.
|
|
||||||
/// </value>
|
|
||||||
public string Name { get; set; }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,8 +1,4 @@
|
|||||||
using Confluent.Kafka;
|
using Volo.Abp.Modularity;
|
||||||
using JiShe.CollectBus.Kafka.Consumer;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Volo.Abp.Modularity;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
namespace JiShe.CollectBus.Kafka
|
||||||
{
|
{
|
||||||
@ -10,16 +6,6 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
{
|
{
|
||||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||||
{
|
{
|
||||||
var configuration = context.Services.GetConfiguration();
|
|
||||||
|
|
||||||
// 注册 Kafka 生产者
|
|
||||||
context.Services.AddSingleton<IProducer<string, string>>(sp =>
|
|
||||||
new ProducerBuilder<string, string>(
|
|
||||||
configuration.GetSection("Kafka:ProducerConfig").Get<ProducerConfig>()
|
|
||||||
)
|
|
||||||
.Build());
|
|
||||||
// 注册后台服务
|
|
||||||
context.Services.AddHostedService<ConsumerBackgroundService>();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,54 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using Microsoft.Extensions.Hosting;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
|
||||||
{
|
|
||||||
public class ConsumerBackgroundService : BackgroundService
|
|
||||||
{
|
|
||||||
private readonly ConsumerService _consumerService;
|
|
||||||
private readonly ILogger<ConsumerBackgroundService> _logger;
|
|
||||||
|
|
||||||
public ConsumerBackgroundService(
|
|
||||||
ConsumerService consumerService,
|
|
||||||
ILogger<ConsumerBackgroundService> logger)
|
|
||||||
{
|
|
||||||
_consumerService = consumerService;
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
||||||
{
|
|
||||||
_consumerService.Subscribe("abp-kafka-topic");
|
|
||||||
|
|
||||||
while (!stoppingToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var result = _consumerService.Consume(stoppingToken);
|
|
||||||
await ProcessMessageAsync(result.Message.Value);
|
|
||||||
_consumerService.Commit(result);
|
|
||||||
}
|
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"Message consume error: {ex.Error.Reason}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task ProcessMessageAsync(string message)
|
|
||||||
{
|
|
||||||
// 使用 ABP 的异步处理机制
|
|
||||||
await Task.Run(() =>
|
|
||||||
{
|
|
||||||
_logger.LogInformation($"Processing message: {message}");
|
|
||||||
// 这里可以触发 ABP 的领域事件
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,58 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Microsoft.Extensions.Hosting;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
using JiShe.CollectBus.Kafka.AdminClient;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
|
||||||
{
|
|
||||||
public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency
|
|
||||||
{
|
|
||||||
private readonly ILogger<ConsumerService> _logger;
|
|
||||||
private readonly IConsumer<string, string> _consumer;
|
|
||||||
|
|
||||||
public ConsumerService(
|
|
||||||
ILogger<ConsumerService> logger,
|
|
||||||
IConfiguration configuration)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
|
|
||||||
var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig")
|
|
||||||
.Get<ConsumerConfig>();
|
|
||||||
|
|
||||||
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
|
|
||||||
.SetErrorHandler(OnConsumeError)
|
|
||||||
.Build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Subscribe(string topic)
|
|
||||||
{
|
|
||||||
_consumer.Subscribe(topic);
|
|
||||||
_logger.LogInformation($"Subscribed to topic: {topic}");
|
|
||||||
}
|
|
||||||
|
|
||||||
public ConsumeResult<string, string> Consume(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
return _consumer.Consume(cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Commit(ConsumeResult<string, string> result)
|
|
||||||
{
|
|
||||||
_consumer.Commit(result);
|
|
||||||
_logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void OnConsumeError(IConsumer<string, string> consumer, Error error)
|
|
||||||
{
|
|
||||||
_logger.LogError($"Kafka consumer error: {error.Reason}");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
_consumer?.Close();
|
|
||||||
_consumer?.Dispose();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,12 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
|
||||||
{
|
|
||||||
public interface IConsumerService
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
58
src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs
Normal file
58
src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
using Confluent.Kafka;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Kafka
|
||||||
|
{
|
||||||
|
public abstract class ConsumerService<T> : BackgroundService
|
||||||
|
{
|
||||||
|
private readonly IConsumer<Ignore, T> _consumer;
|
||||||
|
|
||||||
|
private readonly ILogger<ConsumerService<T>> _logger;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of the <see cref="ConsumerService{T}"/> class.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="configuration">The configuration.</param>
|
||||||
|
/// <param name="consumer">The consumer.</param>
|
||||||
|
/// <param name="logger">The logger.</param>
|
||||||
|
protected ConsumerService(IConfiguration configuration, IConsumer<Ignore, T> consumer, ILogger<ConsumerService<T>> logger)
|
||||||
|
{
|
||||||
|
_consumer = consumer;
|
||||||
|
_logger = logger;
|
||||||
|
var consumerConfig = new ConsumerConfig
|
||||||
|
{
|
||||||
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
|
GroupId = "InventoryConsumerGroup",
|
||||||
|
AutoOffsetReset = AutoOffsetReset.Earliest
|
||||||
|
};
|
||||||
|
|
||||||
|
_consumer = new ConsumerBuilder<Ignore, T>(consumerConfig).Build();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
_consumer.Subscribe("InventoryUpdates");
|
||||||
|
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
|
||||||
|
var consumeResult = _consumer.Consume(stoppingToken);
|
||||||
|
|
||||||
|
var message = consumeResult.Message.Value;
|
||||||
|
|
||||||
|
await ProcessMessageAsync(consumeResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
_consumer.Close();
|
||||||
|
await Task.CompletedTask;
|
||||||
|
}
|
||||||
|
protected abstract Task ProcessMessageAsync(ConsumeResult<Ignore, T> consumer);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -8,7 +8,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
||||||
<PackageReference Include="Volo.Abp" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -1,14 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Producer
|
|
||||||
{
|
|
||||||
public interface IProducerService
|
|
||||||
{
|
|
||||||
Task ProduceAsync(string topic, string message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,64 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Producer
|
|
||||||
{
|
|
||||||
public class ProducerBaseService<TKey, TValue>
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Initializes a new instance of the <see cref="ProducerBaseService{TKey, TValue}"/> class.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="configuration">The configuration.</param>
|
|
||||||
public ProducerBaseService(IConfiguration configuration)
|
|
||||||
{
|
|
||||||
GetInstance(configuration);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Gets or sets the instance.
|
|
||||||
/// </summary>
|
|
||||||
/// <value>
|
|
||||||
/// The instance.
|
|
||||||
/// </value>
|
|
||||||
public IProducer<TKey, TValue> Instance { get; set; } = default;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Gets the instance.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="configuration">The configuration.</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
|
|
||||||
{
|
|
||||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
|
||||||
var producerConfig = new ProducerConfig
|
|
||||||
{
|
|
||||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
|
||||||
AllowAutoCreateTopics = true,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (enableAuthorization)
|
|
||||||
{
|
|
||||||
producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
|
||||||
producerConfig.SaslMechanism = SaslMechanism.Plain;
|
|
||||||
producerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
|
||||||
producerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
|
||||||
}
|
|
||||||
Instance = new ProducerBuilder<TKey, TValue>(producerConfig).Build();
|
|
||||||
return Instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Produces the asynchronous.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic">The topic.</param>
|
|
||||||
/// <param name="message">The message.</param>
|
|
||||||
/// <param name="cancellationToken">The cancellation token.</param>
|
|
||||||
public async Task ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
|
|
||||||
{
|
|
||||||
await Instance.ProduceAsync(topic, message, cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,31 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Producer
|
|
||||||
{
|
|
||||||
public class ProducerService: IProducerService,ITransientDependency
|
|
||||||
{
|
|
||||||
|
|
||||||
private readonly IProducer<string, string> _producer;
|
|
||||||
|
|
||||||
public ProducerService(IProducer<string, string> producer)
|
|
||||||
{
|
|
||||||
_producer = producer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task ProduceAsync(string topic, string message)
|
|
||||||
{
|
|
||||||
await _producer.ProduceAsync(topic, new Message<string, string>
|
|
||||||
{
|
|
||||||
Key = null,
|
|
||||||
Value = message
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
28
src/JiShe.CollectBus.KafkaProducer/ProducerService.cs
Normal file
28
src/JiShe.CollectBus.KafkaProducer/ProducerService.cs
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
using Confluent.Kafka;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Volo.Abp.DependencyInjection;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Kafka
|
||||||
|
{
|
||||||
|
public class ProducerService<T> : ITransientDependency
|
||||||
|
{
|
||||||
|
private readonly IProducer<Ignore, T> _producer;
|
||||||
|
|
||||||
|
public ProducerService(IConfiguration configuration)
|
||||||
|
{
|
||||||
|
var producerConfig = new ProducerConfig
|
||||||
|
{
|
||||||
|
BootstrapServers = configuration["Kafka:BootstrapServers"]
|
||||||
|
};
|
||||||
|
|
||||||
|
_producer = new ProducerBuilder<Ignore, T>(producerConfig).Build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task ProduceAsync(string topic, T message)
|
||||||
|
{
|
||||||
|
var msg = new Message<Ignore, T> { Value = message };
|
||||||
|
|
||||||
|
await _producer.ProduceAsync(topic, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user