diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index 62bfe8b..674eca4 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -31,8 +31,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{919F4CDB-5C82-4371-B209-403B408DA248}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -91,10 +89,6 @@ Global {C06C4082-638F-2996-5FED-7784475766C1}.Debug|Any CPU.Build.0 = Debug|Any CPU {C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.ActiveCfg = Release|Any CPU {C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.Build.0 = Release|Any CPU - {919F4CDB-5C82-4371-B209-403B408DA248}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {919F4CDB-5C82-4371-B209-403B408DA248}.Debug|Any CPU.Build.0 = Debug|Any CPU - {919F4CDB-5C82-4371-B209-403B408DA248}.Release|Any CPU.ActiveCfg = Release|Any CPU - {919F4CDB-5C82-4371-B209-403B408DA248}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -113,7 +107,6 @@ Global {8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} - {919F4CDB-5C82-4371-B209-403B408DA248} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/src/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs b/src/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs index 02a9f19..6211273 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs @@ -1,6 +1,18 @@ -namespace JiShe.CollectBus.Samples; +using JiShe.CollectBus.Common.Attributes; +using Volo.Abp.EventBus; +namespace JiShe.CollectBus.Samples; + +[EventName("Sample.Kafka.Test")] +[TopicName("Test1")] public class SampleDto { public int Value { get; set; } } + +[EventName("Sample.Kafka.Test2")] +[TopicName("Test2")] +public class SampleDto2 +{ + public int Value { get; set; } +} diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index daebf74..1176dd5 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -16,8 +16,11 @@ using JiShe.CollectBus.Workers; using Volo.Abp.BackgroundWorkers.Hangfire; using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.ScheduledMeterReading; -using AutoMapper.Configuration.Annotations; -using JiShe.CollectBus.Common.Attributes; +using Volo.Abp.EventBus.Kafka; +using Volo.Abp.Kafka; +using Microsoft.Extensions.Configuration; +using Volo.Abp.EventBus; +using Confluent.Kafka; namespace JiShe.CollectBus; @@ -28,35 +31,65 @@ namespace JiShe.CollectBus; typeof(AbpAutoMapperModule), typeof(AbpBackgroundWorkersHangfireModule), typeof(CollectBusFreeRedisModule), - typeof(CollectBusFreeSqlModule) + typeof(CollectBusFreeSqlModule), + typeof(AbpEventBusModule), + typeof(AbpKafkaModule) )] public class CollectBusApplicationModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { + var configuration = context.Services.GetConfiguration(); + context.Services.AddAutoMapperObjectMapper(); Configure(options => { options.AddMaps(validate: true); }); - //Configure(options => - //{ - // options.TokenCookie.Expiration = TimeSpan.FromDays(365); - // options.AutoValidateIgnoredHttpMethods.Add("POST"); - //}); + Configure(configuration.GetSection("Kafka")); + Configure(configuration.GetSection("Kafka:EventBus")); + + Configure(options => + { + options.ConfigureConsumer = config => + { + config.GroupId = configuration.GetValue("Kafka:Consumer:GroupId"); + config.EnableAutoCommit = configuration.GetValue("Kafka:Consumer:EnableAutoCommit"); + }; + }); + + Configure(options => + { + options.ConfigureProducer = config => + { + config.MessageTimeoutMs = configuration.GetValue("Kafka:Producer:MessageTimeoutMs"); + config.Acks = (Acks)configuration.GetValue("Kafka:Producer:Acks"); + }; + }); + + Configure(options => + { + options.ConfigureTopic = specification => + { + specification.ReplicationFactor = configuration.GetValue("Kafka:Topic:ReplicationFactor"); + specification.NumPartitions = configuration.GetValue("Kafka:Topic:NumPartitions"); + }; + }); } public override void OnApplicationInitialization( ApplicationInitializationContext context) { + context.ServiceProvider.GetRequiredService().Initialize(); + var assembly = Assembly.GetExecutingAssembly(); var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList(); foreach (var type in types) { context.AddBackgroundWorkerAsync(type); } - + var dbContext = context.ServiceProvider.GetRequiredService(); //默认初始化表计信息 diff --git a/src/JiShe.CollectBus.Application/CustomKafkaDistributedEventBus.cs b/src/JiShe.CollectBus.Application/CustomKafkaDistributedEventBus.cs new file mode 100644 index 0000000..aeca6fe --- /dev/null +++ b/src/JiShe.CollectBus.Application/CustomKafkaDistributedEventBus.cs @@ -0,0 +1,250 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; +using Confluent.Kafka; +using JiShe.CollectBus.Common.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Volo.Abp; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Kafka; +using Volo.Abp.EventBus.Local; +using Volo.Abp.Guids; +using Volo.Abp.Kafka; +using Volo.Abp.MultiTenancy; +using Volo.Abp.Timing; +using Volo.Abp.Tracing; +using Volo.Abp.Uow; +using Volo.Abp.Threading; + + +namespace JiShe.CollectBus; + +[Dependency(ReplaceServices = true)] +[ExposeServices(typeof(IDistributedEventBus), typeof(CustomKafkaDistributedEventBus))] +public class CustomKafkaDistributedEventBus : DistributedEventBusBase, ISingletonDependency +{ + + protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; } + protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; } + protected IKafkaSerializer Serializer { get; } + protected IProducerPool ProducerPool { get; } + protected ConcurrentDictionary> HandlerFactories { get; } + protected ConcurrentDictionary EventTypes { get; } + protected IKafkaMessageConsumer Consumer { get; private set; } = default!; + + + public CustomKafkaDistributedEventBus(IServiceScopeFactory serviceScopeFactory, + ICurrentTenant currentTenant, + IUnitOfWorkManager unitOfWorkManager, + IOptions abpDistributedEventBusOptions, + IGuidGenerator guidGenerator, + IClock clock, + IEventHandlerInvoker eventHandlerInvoker, + ILocalEventBus localEventBus, + ICorrelationIdProvider correlationIdProvider, + IOptions abpKafkaEventBusOptions, + IKafkaMessageConsumerFactory messageConsumerFactory, + IKafkaSerializer serializer, + IProducerPool producerPool) + : base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider) + { + AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; + MessageConsumerFactory = messageConsumerFactory; + Serializer = serializer; + ProducerPool = producerPool; + HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); + } + + + public void Initialize() + { + SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); + } + + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) + { + var handlerFactories = GetOrCreateHandlerFactories(eventType); + + if (factory.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(factory); + + return new EventHandlerFactoryUnregistrar(this, eventType, factory); + } + + public override void Unsubscribe(Func action) + { + Check.NotNull(action, nameof(action)); + + GetOrCreateHandlerFactories(typeof(TEvent)) + .Locking(factories => + { + factories.RemoveAll( + factory => + { + var singleInstanceFactory = factory as SingleInstanceHandlerFactory; + if (singleInstanceFactory == null) + { + return false; + } + + var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler; + if (actionHandler == null) + { + return false; + } + + return actionHandler.Action == action; + }); + }); + } + + public override void Unsubscribe(Type eventType, IEventHandler handler) + { + GetOrCreateHandlerFactories(eventType) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory handlerFactory && + handlerFactory.HandlerInstance == handler + ); + }); + } + + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); + } + + public override void UnsubscribeAll(Type eventType) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); + } + + protected override async Task PublishToEventBusAsync(Type eventType, object eventData) + { + var headers = new Headers + { + { "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) } + }; + + if (CorrelationIdProvider.Get() != null) + { + headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(CorrelationIdProvider.Get()!)); + } + + var topicAttribute = eventType.GetCustomAttribute(); + + await PublishAsync( + topicAttribute==null?AbpKafkaEventBusOptions.TopicName:topicAttribute.Name, + eventType, + eventData, + headers + ); + } + + protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) + { + unitOfWork.AddOrReplaceDistributedEvent(eventRecord); + } + + protected override IEnumerable GetHandlerFactories(Type eventType) + { + var handlerFactoryList = new List(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))) + { + handlerFactoryList.Add( + new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + public override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) + { + throw new NotImplementedException(); + } + + public override Task PublishManyFromOutboxAsync(IEnumerable outgoingEvents, OutboxConfig outboxConfig) + { + throw new NotImplementedException(); + } + + public override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) + { + throw new NotImplementedException(); + } + + protected override byte[] Serialize(object eventData) + { + return Serializer.Serialize(eventData); + } + + private List GetOrCreateHandlerFactories(Type eventType) + { + return HandlerFactories.GetOrAdd( + eventType, + type => + { + var eventName = EventNameAttribute.GetNameOrDefault(type); + EventTypes.GetOrAdd(eventName, eventType); + return new List(); + } + ); + } + + private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers) + { + var eventName = EventNameAttribute.GetNameOrDefault(eventType); + var body = Serializer.Serialize(eventData); + + return PublishAsync(topicName, eventName, body, headers); + } + + private Task> PublishAsync( + string topicName, + string eventName, + byte[] body, + Headers headers) + { + var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); + + return producer.ProduceAsync( + topicName, + new Message + { + Key = eventName, + Value = body, + Headers = headers + }); + } + + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + { + //Should trigger same type + if (handlerEventType == targetEventType) + { + return true; + } + + //Should trigger for inherited types + if (handlerEventType.IsAssignableFrom(targetEventType)) + { + return true; + } + + return false; + } +} diff --git a/src/JiShe.CollectBus.Application/Handlers/SampleHandler.cs b/src/JiShe.CollectBus.Application/Handlers/SampleHandler.cs new file mode 100644 index 0000000..292c8d6 --- /dev/null +++ b/src/JiShe.CollectBus.Application/Handlers/SampleHandler.cs @@ -0,0 +1,46 @@ +using JiShe.CollectBus.Samples; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus.Distributed; + +namespace JiShe.CollectBus.Handlers +{ + public class SampleHandler : IDistributedEventHandler, + ITransientDependency + { + + private readonly ILogger _logger; + + public SampleHandler(ILogger logger) + { + _logger = logger; + } + + public async Task HandleEventAsync(SampleDto eventData) + { + _logger.LogWarning($"topic Test1 message: {eventData.Value.ToString()}"); + } + } + + public class SampleHandler2 : IDistributedEventHandler, + ITransientDependency + { + + private readonly ILogger _logger; + + public SampleHandler2(ILogger logger) + { + _logger = logger; + } + + public async Task HandleEventAsync(SampleDto2 eventData) + { + _logger.LogWarning($"topic Test2 message: {eventData.Value.ToString()}"); + } + } +} diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index 94c4faf..efdeed7 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -23,6 +23,7 @@ + diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 6235a55..c45767e 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -3,11 +3,21 @@ using System.Threading.Tasks; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.IotSystems.PrepayModel; using Microsoft.AspNetCore.Authorization; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.EventBus.Kafka; namespace JiShe.CollectBus.Samples; public class SampleAppService : CollectBusAppService, ISampleAppService { + + private readonly IDistributedEventBus _distributedEventBus; + public SampleAppService(IDistributedEventBus distributedEventBus) + { + _distributedEventBus = distributedEventBus; + } + + public Task GetAsync() { return Task.FromResult( @@ -36,4 +46,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService var ammeterList = await SqlProvider.Instance.Change(DbEnum.PrepayDB).Select().Where(d => d.TB_CustomerID == 5).Take(10).ToListAsync(); return ammeterList; } + + [AllowAnonymous] + public async Task KafkaSendTest() + { + await _distributedEventBus.PublishAsync( + new SampleDto + { + Value = 123456, + } + ); + + await _distributedEventBus.PublishAsync( + new SampleDto2 + { + Value = 456789, + } + ); + } } diff --git a/src/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs b/src/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs new file mode 100644 index 0000000..7e404a5 --- /dev/null +++ b/src/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.EventBus; +using Volo.Abp; + +namespace JiShe.CollectBus.Common.Attributes +{ + [AttributeUsage(AttributeTargets.Class, Inherited = false)] + public class TopicNameAttribute : Attribute + { + public virtual string Name { get; } + + public TopicNameAttribute(string name) + { + this.Name = Check.NotNullOrWhiteSpace(name, nameof(name)); + } + + public string GetName(Type eventType) => this.Name; + } +} diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 0936310..28eedf6 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -41,7 +41,8 @@ "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" }, "Redis": { - "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + //"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + "Configuration": "127.0.0.1:6379,password=123456@qwer,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "DefaultDB": "14", "HangfireDB": "15" @@ -83,11 +84,41 @@ "Port": 5672 } }, + //"Kafka": { + // "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", + // "EnableAuthorization": false, + // "SecurityProtocol": "SASL_PLAINTEXT", + // "SaslMechanism": "PLAIN", + // "SaslUserName": "lixiao", + // "SaslPassword": "lixiao1980", + // "Topic": { + // "ReplicationFactor": 3, + // "NumPartitions": 1000 + // } "Kafka": { - "EnableAuthorization": false, - "SecurityProtocol": "SASL_PLAINTEXT", - "SaslMechanism": "PLAIN", - "SaslUserName": "lixiao", - "SaslPassword": "lixiao1980" + "Connections": { + "Default": { + "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092" + // "SecurityProtocol": "SASL_PLAINTEXT", + // "SaslMechanism": "PLAIN", + // "SaslUserName": "lixiao", + // "SaslPassword": "lixiao1980", + } + }, + "Consumer": { + "GroupId": "JiShe.CollectBus" + }, + "Producer": { + "MessageTimeoutMs": 6000, + "Acks": -1 + }, + "Topic": { + "ReplicationFactor": 3, + "NumPartitions": 1000 + }, + "EventBus": { + "GroupId": "JiShe.CollectBus", + "TopicName": "DefaultTopicName" + } } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs new file mode 100644 index 0000000..5b85ebf --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs @@ -0,0 +1,111 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Configuration; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Confluent.Kafka.Admin; +using Microsoft.Extensions.Logging; +using Volo.Abp.DependencyInjection; + +namespace JiShe.CollectBus.Kafka.AdminClient +{ + public class AdminClientService : IAdminClientService, ISingletonDependency + { + + private readonly ILogger _logger; + + /// + /// Initializes a new instance of the class. + /// + /// The configuration. + /// The logger. + public AdminClientService(IConfiguration configuration, ILogger logger) + { + _logger = logger; + GetInstance(configuration); + } + + /// + /// Gets or sets the instance. + /// + /// + /// The instance. + /// + public IAdminClient Instance { get; set; } = default; + + /// + /// Gets the instance. + /// + /// The configuration. + /// + public IAdminClient GetInstance(IConfiguration configuration) + { + var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); + var adminClientConfig = new AdminClientConfig() + { + BootstrapServers = configuration["Kafka:BootstrapServers"], + }; + if (enableAuthorization) + { + adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; + adminClientConfig.SaslMechanism = SaslMechanism.Plain; + adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; + adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + } + Instance = new AdminClientBuilder(adminClientConfig).Build(); + return Instance; + } + + /// + /// Checks the topic asynchronous. + /// + /// The topic. + /// + public async Task CheckTopicAsync(string topic) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); + return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); + } + + /// + /// Creates the topic if not exist asynchronous. + /// + /// Name of the topic. + /// The factor number. + /// The partition number. + public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum) + { + try + { + if (await CheckTopicAsync(topicName)) return; + + await Instance.CreateTopicsAsync(new TopicSpecification[] + { + new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum } + }); + } + catch (CreateTopicsException e) + { + _logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + } + + /// + /// Deletes the topic asynchronous. + /// + /// Name of the topic. + public async Task DeleteTopicAsync(List topicName) + { + try + { + await Instance.DeleteTopicsAsync(topicName, null); + } + catch (DeleteTopicsException e) + { + _logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs new file mode 100644 index 0000000..bdb2d07 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.AdminClient +{ + public interface IAdminClientService + { + /// + /// Checks the topic asynchronous. + /// + /// The topic. + /// + Task CheckTopicAsync(string topic); + + /// + /// Creates the topic if not exist asynchronous. + /// + /// Name of the topic. + /// The factor number. + /// The partition number. + /// + Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum); + + /// + /// Deletes the topic asynchronous. + /// + /// Name of the topic. + /// + Task DeleteTopicAsync(List topicName); + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/Attributes/TopicAttribute.cs b/src/JiShe.CollectBus.KafkaProducer/Attributes/TopicAttribute.cs new file mode 100644 index 0000000..4cb2fff --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Attributes/TopicAttribute.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Attributes +{ + [AttributeUsage(AttributeTargets.Class, Inherited = false)] + public class TopicAttribute: Attribute + { + /// + /// Initializes a new instance of the class. + /// + /// The name. + public TopicAttribute(string name = "Default") + { + Name = name; + } + + /// + /// Gets or sets the name. + /// + /// + /// The name. + /// + public string Name { get; set; } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 6f7bc9e..44ddcee 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -1,4 +1,8 @@ -using Volo.Abp.Modularity; +using Confluent.Kafka; +using JiShe.CollectBus.Kafka.Consumer; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Modularity; namespace JiShe.CollectBus.Kafka { @@ -6,6 +10,16 @@ namespace JiShe.CollectBus.Kafka { public override void ConfigureServices(ServiceConfigurationContext context) { + var configuration = context.Services.GetConfiguration(); + + // 注册 Kafka 生产者 + context.Services.AddSingleton>(sp => + new ProducerBuilder( + configuration.GetSection("Kafka:ProducerConfig").Get() + ) + .Build()); + // 注册后台服务 + context.Services.AddHostedService(); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerBackgroundService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerBackgroundService.cs new file mode 100644 index 0000000..e49a09e --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerBackgroundService.cs @@ -0,0 +1,54 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Consumer +{ + public class ConsumerBackgroundService : BackgroundService + { + private readonly ConsumerService _consumerService; + private readonly ILogger _logger; + + public ConsumerBackgroundService( + ConsumerService consumerService, + ILogger logger) + { + _consumerService = consumerService; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _consumerService.Subscribe("abp-kafka-topic"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + var result = _consumerService.Consume(stoppingToken); + await ProcessMessageAsync(result.Message.Value); + _consumerService.Commit(result); + } + catch (ConsumeException ex) + { + _logger.LogError(ex, $"Message consume error: {ex.Error.Reason}"); + } + } + } + + private async Task ProcessMessageAsync(string message) + { + // 使用 ABP 的异步处理机制 + await Task.Run(() => + { + _logger.LogInformation($"Processing message: {message}"); + // 这里可以触发 ABP 的领域事件 + }); + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs new file mode 100644 index 0000000..391a7fc --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -0,0 +1,58 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using JiShe.CollectBus.Kafka.Attributes; +using Volo.Abp.DependencyInjection; +using JiShe.CollectBus.Kafka.AdminClient; + +namespace JiShe.CollectBus.Kafka.Consumer +{ + public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency + { + private readonly ILogger _logger; + private readonly IConsumer _consumer; + + public ConsumerService( + ILogger logger, + IConfiguration configuration) + { + _logger = logger; + + var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig") + .Get(); + + _consumer = new ConsumerBuilder(consumerConfig) + .SetErrorHandler(OnConsumeError) + .Build(); + } + + public void Subscribe(string topic) + { + _consumer.Subscribe(topic); + _logger.LogInformation($"Subscribed to topic: {topic}"); + } + + public ConsumeResult Consume(CancellationToken cancellationToken) + { + return _consumer.Consume(cancellationToken); + } + + public void Commit(ConsumeResult result) + { + _consumer.Commit(result); + _logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}"); + } + + private void OnConsumeError(IConsumer consumer, Error error) + { + _logger.LogError($"Kafka consumer error: {error.Reason}"); + } + + public void Dispose() + { + _consumer?.Close(); + _consumer?.Dispose(); + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs new file mode 100644 index 0000000..1319957 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Consumer +{ + public interface IConsumerService + { + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs deleted file mode 100644 index 3d1d4e0..0000000 --- a/src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs +++ /dev/null @@ -1,58 +0,0 @@ -using Confluent.Kafka; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Kafka -{ - public abstract class ConsumerService : BackgroundService - { - private readonly IConsumer _consumer; - - private readonly ILogger> _logger; - - /// - /// Initializes a new instance of the class. - /// - /// The configuration. - /// The consumer. - /// The logger. - protected ConsumerService(IConfiguration configuration, IConsumer consumer, ILogger> logger) - { - _consumer = consumer; - _logger = logger; - var consumerConfig = new ConsumerConfig - { - BootstrapServers = configuration["Kafka:BootstrapServers"], - GroupId = "InventoryConsumerGroup", - AutoOffsetReset = AutoOffsetReset.Earliest - }; - - _consumer = new ConsumerBuilder(consumerConfig).Build(); - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - _consumer.Subscribe("InventoryUpdates"); - - while (!stoppingToken.IsCancellationRequested) - { - - var consumeResult = _consumer.Consume(stoppingToken); - - var message = consumeResult.Message.Value; - - await ProcessMessageAsync(consumeResult); - } - - _consumer.Close(); - await Task.CompletedTask; - } - protected abstract Task ProcessMessageAsync(ConsumeResult consumer); - } -} diff --git a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj index 72b2660..3175346 100644 --- a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj +++ b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs new file mode 100644 index 0000000..2c46bf4 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs @@ -0,0 +1,14 @@ +using Confluent.Kafka; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Producer +{ + public interface IProducerService + { + Task ProduceAsync(string topic, string message); + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerBaseService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerBaseService.cs new file mode 100644 index 0000000..cbe56a2 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerBaseService.cs @@ -0,0 +1,64 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Configuration; +using Volo.Abp.DependencyInjection; + +namespace JiShe.CollectBus.Kafka.Producer +{ + public class ProducerBaseService + { + /// + /// Initializes a new instance of the class. + /// + /// The configuration. + public ProducerBaseService(IConfiguration configuration) + { + GetInstance(configuration); + } + + /// + /// Gets or sets the instance. + /// + /// + /// The instance. + /// + public IProducer Instance { get; set; } = default; + + /// + /// Gets the instance. + /// + /// The configuration. + /// + public IProducer GetInstance(IConfiguration configuration) + { + var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); + var producerConfig = new ProducerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"], + AllowAutoCreateTopics = true, + }; + + if (enableAuthorization) + { + producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; + producerConfig.SaslMechanism = SaslMechanism.Plain; + producerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; + producerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + } + Instance = new ProducerBuilder(producerConfig).Build(); + return Instance; + } + + /// + /// Produces the asynchronous. + /// + /// The topic. + /// The message. + /// The cancellation token. + public async Task ProduceAsync(string topic, Message message, CancellationToken cancellationToken = default) + { + await Instance.ProduceAsync(topic, message, cancellationToken); + } + + + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs new file mode 100644 index 0000000..95c9a15 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.Configuration; +using Volo.Abp.DependencyInjection; + +namespace JiShe.CollectBus.Kafka.Producer +{ + public class ProducerService: IProducerService,ITransientDependency + { + + private readonly IProducer _producer; + + public ProducerService(IProducer producer) + { + _producer = producer; + } + + public async Task ProduceAsync(string topic, string message) + { + await _producer.ProduceAsync(topic, new Message + { + Key = null, + Value = message + }); + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/ProducerService.cs deleted file mode 100644 index 6651b59..0000000 --- a/src/JiShe.CollectBus.KafkaProducer/ProducerService.cs +++ /dev/null @@ -1,28 +0,0 @@ -using Confluent.Kafka; -using Microsoft.Extensions.Configuration; -using Volo.Abp.DependencyInjection; - -namespace JiShe.CollectBus.Kafka -{ - public class ProducerService : ITransientDependency - { - private readonly IProducer _producer; - - public ProducerService(IConfiguration configuration) - { - var producerConfig = new ProducerConfig - { - BootstrapServers = configuration["Kafka:BootstrapServers"] - }; - - _producer = new ProducerBuilder(producerConfig).Build(); - } - - public async Task ProduceAsync(string topic, T message) - { - var msg = new Message { Value = message }; - - await _producer.ProduceAsync(topic, msg); - } - } -}