diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 4340524..8eacb5b 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -114,21 +114,18 @@ namespace JiShe.CollectBus.Kafka.Consumer consumer!.Subscribe(topics); - _ = Task.Run(async () => + await Task.Run(async () => { while (!cts.IsCancellationRequested) { try { + //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); + var result = consumer.Consume(cts.Token); if (result == null || result.Message==null || result.Message.Value == null) - { -#if DEBUG - _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); -#endif - //consumer.Commit(result); // 手动提交 continue; - } + if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); @@ -188,22 +185,17 @@ namespace JiShe.CollectBus.Kafka.Consumer consumer!.Subscribe(topics); - _ = Task.Run(async () => + await Task.Run(async () => { while (!cts.IsCancellationRequested) { try { + //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); var result = consumer.Consume(cts.Token); if (result == null || result.Message==null || result.Message.Value == null) - { -#if DEBUG - _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); -#endif - //consumer.Commit(result); // 手动提交 - consumer.StoreOffset(result); continue; - } + if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs index b09b3be..f94d296 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs @@ -12,6 +12,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; +using System.Collections.Generic; using System.Reflection; using System.Threading.Tasks; @@ -40,7 +41,7 @@ namespace JiShe.CollectBus.Kafka kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } - lifetime.ApplicationStarted.Register(() => + lifetime.ApplicationStarted.Register(async() => { var logger = provider.GetRequiredService>(); int threadCount = 0; @@ -65,6 +66,7 @@ namespace JiShe.CollectBus.Kafka !type.IsAbstract && !type.IsInterface).ToList(); ; if (subscribeTypes.Count == 0) continue; + foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); @@ -80,7 +82,6 @@ namespace JiShe.CollectBus.Kafka } } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); - }); } @@ -100,7 +101,7 @@ namespace JiShe.CollectBus.Kafka kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } - lifetime.ApplicationStarted.Register(() => + lifetime.ApplicationStarted.Register(async () => { var logger = provider.GetRequiredService>(); int threadCount = 0; @@ -140,6 +141,7 @@ namespace JiShe.CollectBus.Kafka .ToArray(); //var configuration = provider.GetRequiredService(); int threadCount = 0; + List tasks = new List(); foreach (var sub in subscribedMethods) { int partitionCount = 3;// kafkaOptionConfig.NumPartitions; @@ -156,7 +158,7 @@ namespace JiShe.CollectBus.Kafka threadCount++; } } - return Tuple.Create(threadCount, subscribedMethods.Length); + return await Task.FromResult(Tuple.Create(threadCount, subscribedMethods.Length)); } /// @@ -169,42 +171,48 @@ namespace JiShe.CollectBus.Kafka /// private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger) { - var consumerService = provider.GetRequiredService(); + await Task.Run(async () => + { + var consumerService = provider.GetRequiredService(); - if (attr.EnableBatch) - { - await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => + if (attr.EnableBatch) { - try + await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { - // 处理消息 - return await ProcessMessageAsync(message.ToList(), method, subscribe); - } - catch (ConsumeException ex) - { - // 处理消费错误 - logger.LogError($"kafka批量消费异常:{ex.Message}"); - } - return await Task.FromResult(false); - }, attr.GroupId, attr.BatchSize,attr.BatchTimeout); - } - else - { - await consumerService.SubscribeAsync(attr.Topic, async (message) => + try + { + logger.LogInformation($"kafka批量消费消息:{message}"); + // 处理消息 + return await ProcessMessageAsync(message.ToList(), method, subscribe); + } + catch (ConsumeException ex) + { + // 处理消费错误 + logger.LogError($"kafka批量消费异常:{ex.Message}"); + } + return await Task.FromResult(false); + }, attr.GroupId, attr.BatchSize, attr.BatchTimeout); + } + else { - try + await consumerService.SubscribeAsync(attr.Topic, async (message) => { - // 处理消息 - return await ProcessMessageAsync(new List() { message }, method, subscribe); - } - catch (ConsumeException ex) - { - // 处理消费错误 - logger.LogError($"kafka消费异常:{ex.Message}"); - } - return await Task.FromResult(false); - }, attr.GroupId); - } + try + { + logger.LogInformation($"kafka消费消息:{message}"); + // 处理消息 + return await ProcessMessageAsync(new List() { message }, method, subscribe); + } + catch (ConsumeException ex) + { + // 处理消费错误 + logger.LogError($"kafka消费异常:{ex.Message}"); + } + return await Task.FromResult(false); + }, attr.GroupId); + } + }); + } diff --git a/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs b/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs index 91a41c5..6b65535 100644 --- a/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs @@ -12,11 +12,11 @@ namespace JiShe.CollectBus.Common.Consts /// /// 心跳下行消息主题 /// - public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event2"; + public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event"; /// /// 登录下行消息主题 /// - public const string SubscriberLoginIssuedEventName = "issued.login.event2"; + public const string SubscriberLoginIssuedEventName = "issued.login.event"; /// /// 上行消息主题,测试使用 @@ -26,11 +26,11 @@ namespace JiShe.CollectBus.Common.Consts /// /// 心跳上行消息主题 /// - public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event2"; + public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event"; /// /// 登录上行消息主题 /// - public const string SubscriberLoginReceivedEventName = "received.login.event2"; + public const string SubscriberLoginReceivedEventName = "received.login.event"; #region 电表消息主题 ///