From b064ff3d146af5d573eb35c0de627cad3a8d7b6b Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 17 Apr 2025 18:08:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BasicScheduledMeterReadingService.cs | 4 +- .../Consts/ProtocolConst.cs | 8 +- src/JiShe.CollectBus.Host/appsettings.json | 4 +- .../CollectBusKafkaModule.cs | 6 +- .../Consumer/ConsumerService.cs | 33 ++++--- .../KafkaSubcribesExtensions.cs | 86 ++++++++++++++----- .../Producer/ProducerService.cs | 2 + 7 files changed, 99 insertions(+), 44 deletions(-) diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 9184a62..4cd968c 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -304,8 +304,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - //return; + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif diff --git a/src/JiShe.CollectBus.Common/Consts/ProtocolConst.cs b/src/JiShe.CollectBus.Common/Consts/ProtocolConst.cs index 6b65535..91a41c5 100644 --- a/src/JiShe.CollectBus.Common/Consts/ProtocolConst.cs +++ b/src/JiShe.CollectBus.Common/Consts/ProtocolConst.cs @@ -12,11 +12,11 @@ namespace JiShe.CollectBus.Common.Consts /// /// 心跳下行消息主题 /// - public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event"; + public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event2"; /// /// 登录下行消息主题 /// - public const string SubscriberLoginIssuedEventName = "issued.login.event"; + public const string SubscriberLoginIssuedEventName = "issued.login.event2"; /// /// 上行消息主题,测试使用 @@ -26,11 +26,11 @@ namespace JiShe.CollectBus.Common.Consts /// /// 心跳上行消息主题 /// - public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event"; + public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event2"; /// /// 登录上行消息主题 /// - public const string SubscriberLoginReceivedEventName = "received.login.event"; + public const string SubscriberLoginReceivedEventName = "received.login.event2"; #region 电表消息主题 /// diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 501a2a0..affc34d 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -91,8 +91,8 @@ "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, - "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus3" + "NumPartitions": 1, + "ServerTagName": "JiSheCollectBus2" //"Topic": { // "ReplicationFactor": 3, // "NumPartitions": 1000 diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 57e0a3e..39b6444 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -44,10 +44,10 @@ namespace JiShe.CollectBus.Kafka public override void OnApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); - + // 注册Subscriber - app.ApplicationServices.UseKafkaSubscribers(); - + app.ApplicationServices.UseKafkaSubscribe(); + // 获取程序集 //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index fd11df4..ea67b74 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -1,4 +1,5 @@ using Confluent.Kafka; +using JiShe.CollectBus.Common.Consts; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -50,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 - //AllowAutoCreateTopics= true, // 启用自动创建 + //AllowAutoCreateTopics = true, // 启用自动创建 FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; @@ -122,7 +123,9 @@ namespace JiShe.CollectBus.Kafka.Consumer var result = consumer.Consume(cts.Token); if (result == null || result.Message==null || result.Message.Value == null) { +#if DEBUG _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); +#endif //consumer.Commit(result); // 手动提交 continue; } @@ -151,7 +154,7 @@ namespace JiShe.CollectBus.Kafka.Consumer } catch (ConsumeException ex) { - _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); } } }); @@ -170,14 +173,17 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); - + if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName)) + { + string ssss = ""; + } var consumer = _consumerStore.GetOrAdd(consumerKey, _=> ( - CreateConsumer(groupId), + CreateConsumer(groupId), cts - )).Consumer as IConsumer; + )).Consumer as IConsumer; consumer!.Subscribe(topics); @@ -190,8 +196,11 @@ namespace JiShe.CollectBus.Kafka.Consumer var result = consumer.Consume(cts.Token); if (result == null || result.Message==null || result.Message.Value == null) { +#if DEBUG _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); +#endif //consumer.Commit(result); // 手动提交 + consumer.StoreOffset(result); continue; } if (result.IsPartitionEOF) @@ -214,10 +223,12 @@ namespace JiShe.CollectBus.Kafka.Consumer bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 + else + consumer.StoreOffset(result); } catch (ConsumeException ex) { - _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); } } }); @@ -339,7 +350,7 @@ namespace JiShe.CollectBus.Kafka.Consumer } catch (ConsumeException ex) { - _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); } catch (OperationCanceledException) { @@ -385,14 +396,14 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费等待时间 public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( - CreateConsumer(groupId), + CreateConsumer(groupId), cts - )).Consumer as IConsumer; + )).Consumer as IConsumer; consumer!.Subscribe(topics); diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index c28c82c..0ea4d55 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -10,6 +10,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Newtonsoft.Json; using System.Reflection; namespace JiShe.CollectBus.Kafka @@ -21,7 +22,7 @@ namespace JiShe.CollectBus.Kafka /// /// /// - public static void UseKafkaSubscribers(this IServiceProvider provider) + public static void UseKafkaSubscribe(this IServiceProvider provider) { var lifetime = provider.GetRequiredService(); @@ -69,7 +70,7 @@ namespace JiShe.CollectBus.Kafka { if (subscribe!=null) { - Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); + Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } @@ -77,6 +78,7 @@ namespace JiShe.CollectBus.Kafka } } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); + }); } @@ -113,7 +115,7 @@ namespace JiShe.CollectBus.Kafka if (subscribe != null) { - Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); + Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } @@ -128,7 +130,7 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static Tuple BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) + private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) @@ -166,7 +168,7 @@ namespace JiShe.CollectBus.Kafka if (attr.EnableBatch) { - await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => + await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { try { @@ -183,7 +185,7 @@ namespace JiShe.CollectBus.Kafka } else { - await consumerService.SubscribeAsync(attr.Topic, async (message) => + await consumerService.SubscribeAsync(attr.Topic, async (message) => { try { @@ -209,36 +211,76 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe) + private static async Task ProcessMessageAsync(object message, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); bool isGenericTask = method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); bool existParameters = parameters.Length > 0; - //dynamic? messageObj= null; + //dynamic? messageObj = null; //if (existParameters) //{ - // var paramType = parameters[0].ParameterType; - // messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType); + // var paramType = parameters[0].ParameterType; + // if (paramType.IsInstanceOfType(message)) + // return message; // 类型兼容则直接使用 + + // var json = message.ToString(); + // messageObj= message.de JsonConvert.DeserializeObject(json, targetType); //} - if (isGenericTask) + + object[] args = null; + if (existParameters) { - object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { message } :null)!; - if (result is ISubscribeAck ackResult) - { - return ackResult.Ack; - } + var paramType = parameters[0].ParameterType; + // 类型转换逻辑 + object convertedMessage = paramType.IsInstanceOfType(message) + ? message + : ConvertMessage(message, paramType); + + args = new object[] { convertedMessage }; } - else + var result = method.Invoke(subscribe, args); + if (result is Task genericTask) { - object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null); - if (result is ISubscribeAck ackResult) - { - return ackResult.Ack; - } + await genericTask.ConfigureAwait(false); + return genericTask.Result.Ack; } + else if (result is Task nonGenericTask) + { + await nonGenericTask.ConfigureAwait(false); + return true; + } + else if (result is ISubscribeAck ackResult) + { + return ackResult.Ack; + } + + //if (isGenericTask) + //{ + // object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { message } :null)!; + // if (result is ISubscribeAck ackResult) + // { + // return ackResult.Ack; + // } + //} + //else + //{ + // object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null); + // if (result is ISubscribeAck ackResult) + // { + // return ackResult.Ack; + // } + //} return false; } + // 可扩展的类型转换器 + private static object ConvertMessage(object rawMessage, Type targetType) + { + if (targetType.IsInstanceOfType(rawMessage)) + return rawMessage; // 类型兼容则直接使用 + var json = rawMessage.ToString(); + return JsonConvert.DeserializeObject(json, targetType); + } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index 42fc9cf..8231760 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -129,6 +129,7 @@ namespace JiShe.CollectBus.Kafka.Producer var producer = GetProducer(typeKey); var message = new Message { + Key= _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } @@ -187,6 +188,7 @@ namespace JiShe.CollectBus.Kafka.Producer { var message = new Message { + Key = _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }