From 270abd656f800dde75459f57a221e913323da598 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Sat, 19 Apr 2025 00:30:58 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E5=9B=9E=E8=B0=83=E5=8F=82=E6=95=B0=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20=E8=AE=A2=E9=98=85=E8=B0=83=E6=95=B4=E4=B8=BA?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- JiShe.CollectBus.sln | 7 + .../KafkaSubscribeTest.cs | 76 ++++---- .../JiShe.CollectBus.Kafka.Test/Program.cs | 64 ++++--- .../appsettings.json | 4 +- .../Attributes/KafkaSubscribeAttribute.cs | 2 +- .../CollectBusKafkaModule.cs | 12 +- .../Consumer/ConsumerService.cs | 49 +++--- .../Consumer/IConsumerService.cs | 8 +- .../{ => Internal}/HeadersFilter.cs | 2 +- .../{ => Internal}/IKafkaSubscribe.cs | 2 +- .../{ => Internal}/ISubscribeAck.cs | 2 +- .../{ => Internal}/KafkaOptionConfig.cs | 2 +- .../Internal/ReflectionHelper.cs | 113 ++++++++++++ .../{ => Internal}/SubscribeResult.cs | 2 +- .../KafkaSubcribesExtensions.cs | 166 ++++++++++++++---- .../Producer/ProducerService.cs | 6 +- .../{ => Serialization}/JsonSerializer.cs | 43 ++++- .../Subscribers/ISubscriberAppService.cs | 13 +- .../IWorkerSubscriberAppService.cs | 2 +- .../CollectBusApplicationModule.cs | 1 + .../Samples/SampleAppService.cs | 2 +- .../BasicScheduledMeterReadingService.cs | 2 +- ...nergySystemScheduledMeterReadingService.cs | 2 +- .../Subscribers/SubscriberAppService.cs | 154 ++++++++-------- .../Subscribers/WorkerSubscriberAppService.cs | 2 +- web/JiShe.CollectBus.Host/appsettings.json | 6 +- 26 files changed, 519 insertions(+), 225 deletions(-) rename modules/JiShe.CollectBus.Kafka/{ => Internal}/HeadersFilter.cs (94%) rename modules/JiShe.CollectBus.Kafka/{ => Internal}/IKafkaSubscribe.cs (90%) rename modules/JiShe.CollectBus.Kafka/{ => Internal}/ISubscribeAck.cs (89%) rename modules/JiShe.CollectBus.Kafka/{ => Internal}/KafkaOptionConfig.cs (97%) create mode 100644 modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs rename modules/JiShe.CollectBus.Kafka/{ => Internal}/SubscribeResult.cs (97%) rename modules/JiShe.CollectBus.Kafka/{ => Serialization}/JsonSerializer.cs (61%) diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index d4e6591..fa3fd6c 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -47,6 +47,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services", EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -121,6 +123,10 @@ Global {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -143,6 +149,7 @@ Global {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs b/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs index 4c06e22..c7ad309 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs +++ b/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs @@ -3,6 +3,7 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Kafka.Attributes; +using JiShe.CollectBus.Kafka.Internal; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; @@ -16,53 +17,54 @@ namespace JiShe.CollectBus.Kafka.Test { public class KafkaSubscribeTest: IKafkaSubscribe { - [KafkaSubscribe(ProtocolConst.TESTTOPIC, EnableBatch=false,BatchSize=1000)] + [KafkaSubscribe(ProtocolConst.TESTTOPIC, EnableBatch = false, BatchSize = 10)] - public async Task KafkaSubscribeAsync(object obj) + public async Task KafkaSubscribeAsync(TestTopic obj) + //public async Task KafkaSubscribeAsync(IEnumerable obj) { Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(obj)}"); return SubscribeAck.Success(); } - [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] - public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + //public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] - public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + //public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] - public async Task ReceivedEvent(MessageReceived receivedMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] + //public async Task ReceivedEvent(MessageReceived receivedMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] - public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedHeartbeatMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + //public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedHeartbeatMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] - public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedLoginMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + //public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedLoginMessage)}"); + // return SubscribeAck.Success(); + //} } } diff --git a/modules/JiShe.CollectBus.Kafka.Test/Program.cs b/modules/JiShe.CollectBus.Kafka.Test/Program.cs index a359e14..016d0c4 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/Program.cs +++ b/modules/JiShe.CollectBus.Kafka.Test/Program.cs @@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Test; using Microsoft.AspNetCore.Builder; @@ -86,12 +87,13 @@ var logger = loggerFactory.CreateLogger(); logger.LogInformation("程序启动"); var adminClientService = host.Services.GetRequiredService(); var configuration = host.Services.GetRequiredService(); -string topic = "test-topic"; +string topic = ProtocolConst.TESTTOPIC; //await adminClientService.DeleteTopicAsync(topic); // 创建 topic //await adminClientService.CreateTopicAsync(topic, configuration.GetValue(CommonConst.NumPartitions), 3); var consumerService = host.Services.GetRequiredService(); +var producerService = host.Services.GetRequiredService(); //var kafkaOptions = host.Services.GetRequiredService>(); //await consumerService.SubscribeAsync(topic, (message) => //{ @@ -113,43 +115,49 @@ var consumerService = host.Services.GetRequiredService(); //for (int i = 0; i < 3; i++) //{ -// await consumerService.SubscribeBatchAsync(topic, (message) => +//await consumerService.SubscribeBatchAsync(topic, (message) => +//{ +// try // { -// try -// { -// int index = 0; -// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}:{JsonSerializer.Serialize(message)}"); -// return Task.FromResult(true); +// int index = 0; +// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}:{JsonSerializer.Serialize(message)}"); +// return Task.FromResult(true); -// } -// catch (ConsumeException ex) -// { -// // 处理消费错误 -// logger.LogError($"kafka消费异常:{ex.Message}"); -// } -// return Task.FromResult(false); -// }); +// } +// catch (ConsumeException ex) +// { +// // 处理消费错误 +// logger.LogError($"kafka消费异常:{ex.Message}"); +// } +// return Task.FromResult(false); +//}); //} //stopwatch.Stop(); //Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒"); -var producerService = host.Services.GetRequiredService(); -//int num = 840; -//while (num <= 900) + +int num = 1; +while (num <= 6) +{ + await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = num }); + num++; +} + +//int num = 2; +//while (num <= 4) //{ -// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); // await producerService.ProduceAsync(topic, num.ToString()); // num++; //} -await Task.Factory.StartNew(async() => { - int num = 0; - while (true) - { - //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); - await producerService.ProduceAsync(topic, num.ToString()); - num++; - } -}); +//await Task.Factory.StartNew(async() => { +// int num = 0; +// while (true) +// { +// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); +// await producerService.ProduceAsync(topic, num.ToString()); +// num++; +// } +//}); Console.WriteLine("\n按Esc键退出"); while (true) { diff --git a/modules/JiShe.CollectBus.Kafka.Test/appsettings.json b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json index b2579c6..9767dee 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/appsettings.json +++ b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json @@ -91,8 +91,8 @@ "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, - "NumPartitions": 1, - "ServerTagName": "JiSheCollectBus2" + "NumPartitions": 30, + "ServerTagName": "JiSheCollectBus99" //"Topic": { // "ReplicationFactor": 3, // "NumPartitions": 1000 diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs index c74aa2e..598caf0 100644 --- a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs +++ b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs @@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes /// /// 消费者组 /// - public string GroupId { get; set; } = "default"; + public string? GroupId { get; set; } = null;//"default" /// /// 任务数(默认是多少个分区多少个任务) diff --git a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs index b467162..5621b3d 100644 --- a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs +++ b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; @@ -43,10 +44,19 @@ namespace JiShe.CollectBus.Kafka //context.Services.AddHostedService(); } + /// + /// 在初始化之前,初始化Kafka Topic + /// + /// + public override void OnPreApplicationInitialization(ApplicationInitializationContext context) + { + var app = context.GetApplicationBuilder(); + app.ApplicationServices.UseInitKafkaTopic(); + } + public override void OnApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); - // 注册Subscriber app.ApplicationServices.UseKafkaSubscribe(); diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 0ec5bd0..5093561 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -1,5 +1,7 @@ using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Kafka.Serialization; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -47,11 +49,11 @@ namespace JiShe.CollectBus.Kafka.Consumer var config = new ConsumerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, - GroupId = groupId ?? "default", + GroupId = groupId ?? _kafkaOptionConfig.ServerTagName, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 - AllowAutoCreateTopics = true, // 启用自动创建 + //AllowAutoCreateTopics = true, // 启用自动创建 FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; @@ -252,7 +254,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费组ID /// 批次大小 /// 批次超时时间 - public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); } @@ -267,17 +269,17 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费组ID /// 批次大小 /// 批次超时时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - + //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + // ( + // CreateConsumer(groupId), + // cts + // )).Consumer as IConsumer; + var consumer = CreateConsumer(groupId); consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -300,8 +302,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (result.IsPartitionEOF) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(10, cts.Token); } else if (result.Message.Value != null) { @@ -330,7 +332,7 @@ namespace JiShe.CollectBus.Kafka.Consumer // 处理批次 if (messages.Count > 0) { - bool success = await messageBatchHandler(messages.Select(m => m.Value)); + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); if (success) { var offsetsByPartition = new Dictionary(); @@ -383,7 +385,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次大小 /// 批次超时时间 /// 消费等待时间 - public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class + public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); @@ -400,17 +402,18 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次大小 /// 批次超时时间 /// 消费等待时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class + public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; + //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + // ( + // CreateConsumer(groupId), + // cts + // )).Consumer as IConsumer; + var consumer= CreateConsumer (groupId); consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -434,8 +437,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (result.IsPartitionEOF) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(10, cts.Token); } else if (result.Message.Value != null) { @@ -464,7 +467,7 @@ namespace JiShe.CollectBus.Kafka.Consumer // 处理批次 if (messages.Count > 0) { - bool success = await messageBatchHandler(messages.Select(m => m.Value)); + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); if (success) { var offsetsByPartition = new Dictionary(); diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs index d86dba8..77901ef 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs @@ -33,13 +33,13 @@ namespace JiShe.CollectBus.Kafka.Consumer /// Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; void Unsubscribe() where TKey : notnull where TValue : class; } diff --git a/modules/JiShe.CollectBus.Kafka/HeadersFilter.cs b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs similarity index 94% rename from modules/JiShe.CollectBus.Kafka/HeadersFilter.cs rename to modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs index 0790f9f..7b479fa 100644 --- a/modules/JiShe.CollectBus.Kafka/HeadersFilter.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs @@ -5,7 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { /// /// 消息头过滤器 diff --git a/modules/JiShe.CollectBus.Kafka/IKafkaSubscribe.cs b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs similarity index 90% rename from modules/JiShe.CollectBus.Kafka/IKafkaSubscribe.cs rename to modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs index 39e5789..0ad450f 100644 --- a/modules/JiShe.CollectBus.Kafka/IKafkaSubscribe.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { /// /// Kafka订阅者 diff --git a/modules/JiShe.CollectBus.Kafka/ISubscribeAck.cs b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs similarity index 89% rename from modules/JiShe.CollectBus.Kafka/ISubscribeAck.cs rename to modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs index ffb30ef..ce24d69 100644 --- a/modules/JiShe.CollectBus.Kafka/ISubscribeAck.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { public interface ISubscribeAck { diff --git a/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs similarity index 97% rename from modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs rename to modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index e592ea2..4814c6a 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -5,7 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { public class KafkaOptionConfig { diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs new file mode 100644 index 0000000..ded11d7 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs @@ -0,0 +1,113 @@ +using Newtonsoft.Json; +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Internal +{ + /// + /// 反射辅助类 + /// + public static class ReflectionHelper + { + /// + ///集合类型 + ///Item1:参数类型 + ///Item2:集合元素类型 + /// + public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex=0) + { + // 参数校验 + if (method == null) throw new ArgumentNullException(nameof(method)); + var parameters = method.GetParameters(); + if (parameterIndex < 0 || parameterIndex >= parameters.Length) + throw new ArgumentOutOfRangeException(nameof(parameterIndex)); + + ParameterInfo param = parameters[parameterIndex]; + Type paramType = param.ParameterType; + Type? elementType = null; + + // 判断是否是集合类型(排除字符串) + if (paramType != typeof(string) && IsEnumerableType(paramType)) + { + elementType = GetEnumerableElementType(paramType); + } + + return Tuple.Create(paramType, elementType); + + } + + /// + /// 判断是否是集合类型(排除字符串) + /// + public static bool IsEnumerableType(this Type type) + { + return type.IsArray + || (type.IsGenericType && type.GetInterfaces() + .Any(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>))) + || type.GetInterfaces().Any(t => t == typeof(System.Collections.IEnumerable)); + } + + /// + /// 获取集合元素的类型 + /// + public static Type? GetEnumerableElementType(this Type type) + { + // 处理数组类型 + if (type.IsArray) + return type.GetElementType(); + + // 处理直接实现IEnumerable的类型(如IEnumerable本身) + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>)) + return type.GetGenericArguments()[0]; + + // 处理通过接口实现IEnumerable的泛型集合(如List) + var genericEnumerable = type.GetInterfaces() + .FirstOrDefault(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)); + if (genericEnumerable != null) + return genericEnumerable.GetGenericArguments()[0]; + + // 处理非泛型集合类型(如 ArrayList) + if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList)) + return typeof(ArrayList); + // 返回null表示无法确定元素类型 + return null; + } + + + // + /// 判断是否使用强转换 + /// + /// 目标类型 + /// + public static bool IsConvertType(this Type targetType) + { + // 处理可空类型 + Type underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType; + // 情况1:值类型或基元类型(如 int、DateTime) + if (underlyingType.IsValueType || underlyingType.IsPrimitive) + return true; + // 情况2:字符串类型直接赋值 + else if (underlyingType == typeof(string)) + return true; + + // 情况3:枚举类型处理 + //else if (underlyingType.IsEnum) + //{ + // if (Enum.IsDefined(underlyingType, msg)) + // { + // convertedValue = Enum.Parse(underlyingType, msg.ToString()); + // return true; + // } + // return false; + //} + return false; + } + } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/SubscribeResult.cs b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs similarity index 97% rename from modules/JiShe.CollectBus.Kafka/SubscribeResult.cs rename to modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs index 83eaa49..0c016ff 100644 --- a/modules/JiShe.CollectBus.Kafka/SubscribeResult.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs @@ -6,7 +6,7 @@ using System.Text; using System.Threading.Tasks; using static System.Runtime.InteropServices.JavaScript.JSType; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { public class SubscribeResult: ISubscribeAck { diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs index ff60130..a5bf39c 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs @@ -5,30 +5,33 @@ using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Kafka.Serialization; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Mvc.Abstractions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Newtonsoft.Json; +using System; +using System.Collections; using System.Collections.Generic; +using System.ComponentModel; +using System.Linq.Expressions; using System.Reflection; +using System.Text.Json; using System.Threading.Tasks; +using YamlDotNet.Core.Tokens; +using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.Kafka { public static class KafkaSubcribesExtensions { - /// - /// 添加Kafka订阅 - /// - /// - /// - public static void UseKafkaSubscribe(this IServiceProvider provider) - { - var lifetime = provider.GetRequiredService(); + public static void UseInitKafkaTopic(this IServiceProvider provider) + { //初始化主题信息 var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); @@ -40,6 +43,17 @@ namespace JiShe.CollectBus.Kafka { kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } + } + + /// + /// 添加Kafka订阅 + /// + /// + /// + public static void UseKafkaSubscribe(this IServiceProvider provider) + { + var lifetime = provider.GetRequiredService(); + var kafkaOptions = provider.GetRequiredService>(); lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); @@ -88,17 +102,7 @@ namespace JiShe.CollectBus.Kafka { var provider = app.ApplicationServices; var lifetime = provider.GetRequiredService(); - //初始化主题信息 - var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); - - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); - topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - - foreach (var item in topics) - { - kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); - } lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); @@ -132,7 +136,7 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) + private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) @@ -140,14 +144,14 @@ namespace JiShe.CollectBus.Kafka .ToArray(); //var configuration = provider.GetRequiredService(); int threadCount = 0; - + foreach (var sub in subscribedMethods) { int partitionCount = 3;// kafkaOptionConfig.NumPartitions; #if DEBUG var adminClientService = provider.GetRequiredService(); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); - partitionCount= partitionCount> topicCount ? topicCount: partitionCount; + partitionCount = partitionCount > topicCount ? topicCount : partitionCount; #endif //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) @@ -170,18 +174,18 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger) + private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); if (attr.EnableBatch) { - await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => + await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { try { #if DEBUG - logger.LogInformation($"kafka批量消费消息:{message}"); + logger.LogInformation($"kafka批量消费消息:{message.Serialize()}"); #endif // 处理消息 return await ProcessMessageAsync(message.ToList(), method, subscribe); @@ -196,7 +200,7 @@ namespace JiShe.CollectBus.Kafka } else { - await consumerService.SubscribeAsync(attr.Topic, async (message) => + await consumerService.SubscribeAsync(attr.Topic, async (message) => { try { @@ -225,26 +229,112 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) + private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); bool isGenericTask = method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); bool existParameters = parameters.Length > 0; - List? messageObj = null; + object[]? executeParameters = null; + if (existParameters) { - messageObj = new List(); - var paramType = parameters[0].ParameterType; - foreach (var msg in messages) + IList? list = null; + Tuple tuple = method.GetParameterTypeInfo(); + bool isEnumerable = false; + if (tuple.Item2 != null) { - var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg; - if (data != null) - messageObj.Add(data); + Type listType = typeof(List<>).MakeGenericType(tuple.Item2); + list = (IList)Activator.CreateInstance(listType)!; + isEnumerable = tuple.Item2.IsConvertType(); + } + else + { + isEnumerable = tuple.Item1.IsConvertType(); + } + #region 暂时 + //foreach (var msg in messages) + //{ + // if (tuple.Item2 != null) + // { + // if (isEnumerable) + // { + // var parameterType = parameters[0].ParameterType; + // var data=messages?.Serialize().Deserialize(parameterType); + // messageObj = data!=null? new[] { data }:null; + // break; + // } + // else + // { + // // 集合类型 + // var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/; + // if (data != null) + // list?.Add(data); + // } + + // } + // else + // { + // // (dynamic)Convert.ChangeType(msg, tuple.Item1) + // using (var stream = new MemoryStream(msg)) + // { + // var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1); + // } + // var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1); + // if (data != null) + // messageObj = new[] { data }; + // } + //} + //if (tuple.Item2 != null && list != null && list.Count > 0) + //{ + // messageObj = new[] { list }; + //} + #endregion + var parameterDescriptors = method.GetParameters(); + executeParameters = new object?[parameterDescriptors.Length]; + for (var i = 0; i < parameterDescriptors.Length; i++) + { + foreach (var item in messages) + { + + object? tempParameter=null; + var parameterDescriptor = parameterDescriptors[i]; + if (KafkaSerialization.IsJsonType(item)) + { + tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType); + } + else + { + + var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType); + if (converter.CanConvertFrom(item.GetType())) + { + tempParameter = converter.ConvertFrom(item); + } + else + { + if (parameterDescriptor.ParameterType.IsInstanceOfType(item)) + tempParameter = item; + else + tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType); + } + } + if (tuple.Item2 == null) + { + executeParameters[i] = tempParameter; + } + else + { + list.Add(tempParameter); + } + + } + if(list!=null && list.Count>0) + executeParameters[i] = list; } } - var result = method.Invoke(subscribe, messageObj?.ToArray()); + var result = method.Invoke(subscribe, executeParameters); if (result is Task genericTask) { await genericTask.ConfigureAwait(false); @@ -261,6 +351,10 @@ namespace JiShe.CollectBus.Kafka } return false; } - + + + } + + } diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index 529d293..db0efd8 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -6,6 +6,8 @@ using System.Text; using System.Threading.Tasks; using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Kafka.Serialization; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -56,7 +58,7 @@ namespace JiShe.CollectBus.Kafka.Producer var config = new ProducerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, - AllowAutoCreateTopics = true, + //AllowAutoCreateTopics = true, QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32_768, // 修改批次大小为32K @@ -108,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Producer var message = new Message { Key = key, - Value = value, + Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } diff --git a/modules/JiShe.CollectBus.Kafka/JsonSerializer.cs b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs similarity index 61% rename from modules/JiShe.CollectBus.Kafka/JsonSerializer.cs rename to modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs index 83f58a3..8034954 100644 --- a/modules/JiShe.CollectBus.Kafka/JsonSerializer.cs +++ b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs @@ -8,7 +8,7 @@ using Confluent.Kafka; using System.Text.Json.Serialization; using System.Text.Encodings.Web; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Serialization { /// /// JSON 序列化器(支持泛型) @@ -49,10 +49,11 @@ namespace JiShe.CollectBus.Kafka { if (isNull) return default; - try { - return JsonSerializer.Deserialize(data, _options); + if (data.IsEmpty) + return default; + return JsonSerializer.Deserialize(data, _options)!; } catch (Exception ex) { @@ -85,4 +86,40 @@ namespace JiShe.CollectBus.Kafka writer.WriteStringValue(value.ToString(_dateFormatString)); } } + + + public static class KafkaSerialization + { + + /// + /// 判断是否是json类型 + /// + /// + /// + public static bool IsJsonType(this object jsonObject) + { + return jsonObject is JsonElement; + } + public static object? Deserialize(object value, Type valueType) + { + var _jsonSerializerOptions = new JsonSerializerOptions + { + DefaultIgnoreCondition = JsonIgnoreCondition.Never, + WriteIndented = false,// 设置格式化输出 + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 + IgnoreReadOnlyFields = true, + IgnoreReadOnlyProperties = true, + NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串 + AllowTrailingCommas = true, // 忽略尾随逗号 + ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 + PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 + Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, + }; + + if (value is JsonElement jsonElement) return jsonElement.Deserialize(valueType, _jsonSerializerOptions); + + throw new NotSupportedException("Type is not of type JsonElement"); + } + } } diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs index 658ff29..bdc6b78 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs @@ -1,17 +1,18 @@ -using System.Threading.Tasks; +using System.Collections.Generic; +using System.Threading.Tasks; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; -using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Internal; using Volo.Abp.Application.Services; namespace JiShe.CollectBus.Subscribers { public interface ISubscriberAppService : IApplicationService { - Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage); - Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage); + Task LoginIssuedEvent(List issuedEventMessage); + Task HeartbeatIssuedEvent(List issuedEventMessage); Task ReceivedEvent(MessageReceived receivedMessage); - Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage); - Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage); + Task ReceivedHeartbeatEvent(List receivedHeartbeatMessage); + Task ReceivedLoginEvent(List receivedLoginMessage); } } diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index 9a37167..f0a79cf 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -1,7 +1,7 @@ using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Internal; using System.Collections.Generic; using System.Threading.Tasks; using Volo.Abp.Application.Services; diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index e5077cc..b61514e 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -23,6 +23,7 @@ using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.EventBus; using Volo.Abp.Modularity; using Microsoft.Extensions.Options; +using JiShe.CollectBus.Kafka.Internal; namespace JiShe.CollectBus; diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index f0aa541..53db99a 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -20,13 +20,13 @@ using System.Diagnostics.Metrics; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Kafka.Attributes; using System.Text.Json; -using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.Models; using System.Diagnostics; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.Kafka.Internal; namespace JiShe.CollectBus.Samples; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 18f6780..4285bf5 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -11,7 +11,6 @@ using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; -using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.RedisDataCache; @@ -27,6 +26,7 @@ using System.Threading; using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Interface; using static FreeSql.Internal.GlobalFilter; +using JiShe.CollectBus.Kafka.Internal; namespace JiShe.CollectBus.ScheduledMeterReading { diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index dfbcd26..fe398a5 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -13,7 +13,7 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; -using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 4236855..889cd91 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -5,7 +5,6 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; @@ -19,6 +18,8 @@ using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Interface; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; +using System.Collections.Generic; +using JiShe.CollectBus.Kafka.Internal; namespace JiShe.CollectBus.Subscribers { @@ -65,67 +66,75 @@ namespace JiShe.CollectBus.Subscribers _dbProvider = dbProvider; } - [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] - public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) + public async Task LoginIssuedEvent(List issuedEventMessages) { bool isAck = false; - switch (issuedEventMessage.Type) + foreach (var issuedEventMessage in issuedEventMessages) { - case IssuedEventType.Heartbeat: - break; - case IssuedEventType.Login: - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); - var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); - loginEntity.AckTime = Clock.Now; - loginEntity.IsAck = true; - await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - isAck = true; - break; - case IssuedEventType.Data: - break; - default: - throw new ArgumentOutOfRangeException(); + switch (issuedEventMessage.Type) + { + case IssuedEventType.Heartbeat: + break; + case IssuedEventType.Login: + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); + var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + loginEntity.AckTime = Clock.Now; + loginEntity.IsAck = true; + await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); + isAck = true; + break; + case IssuedEventType.Data: + break; + default: + throw new ArgumentOutOfRangeException(); + } + + //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); + //if (device != null) + //{ + // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); + //} + + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } - - //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - //if (device != null) - //{ - // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); - //} - - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); } - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] - public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) + public async Task HeartbeatIssuedEvent(List issuedEventMessages) { bool isAck = false; - switch (issuedEventMessage.Type) + foreach (var issuedEventMessage in issuedEventMessages) { - case IssuedEventType.Heartbeat: - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); - var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); - heartbeatEntity.AckTime = Clock.Now; - heartbeatEntity.IsAck = true; - await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - isAck = true; - break; - case IssuedEventType.Data: - break; - default: - throw new ArgumentOutOfRangeException(); + switch (issuedEventMessage.Type) + { + case IssuedEventType.Heartbeat: + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); + var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + heartbeatEntity.AckTime = Clock.Now; + heartbeatEntity.IsAck = true; + await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); + isAck = true; + break; + case IssuedEventType.Data: + break; + default: + throw new ArgumentOutOfRangeException(); + } + + //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); + //if (device != null) + //{ + // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); + //} + + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } - - //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - //if (device != null) - //{ - // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); - //} - - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } @@ -186,37 +195,44 @@ namespace JiShe.CollectBus.Subscribers return SubscribeAck.Success(); } - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] - public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) + public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) + foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) { - _logger.LogError("协议不存在!"); - } - else - { - await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); - await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + else + { + await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); + await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); + } } return SubscribeAck.Success(); } - [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)] //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] - public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) + public async Task ReceivedLoginEvent(List receivedLoginMessages) { - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) + foreach (var receivedLoginMessage in receivedLoginMessages) { - _logger.LogError("协议不存在!"); - } - else - { - await protocolPlugin.LoginAsync(receivedLoginMessage); - await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + else + { + await protocolPlugin.LoginAsync(receivedLoginMessage); + await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); + } } + return SubscribeAck.Success(); } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index b3102d0..fa55fe7 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -8,8 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Attributes; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Repository.MeterReadingRecord; diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index f776202..8e31563 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -43,7 +43,7 @@ "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "MaxPoolSize": "50", "DefaultDB": "14", - "HangfireDB": "15" + "HangfireDB": "13" }, "Jwt": { "Audience": "JiShe.CollectBus", @@ -84,7 +84,7 @@ "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus2" + "ServerTagName": "JiSheCollectBus99" }, "IoTDBOptions": { "UserName": "root", @@ -95,7 +95,7 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, - "ServerTagName": "JiSheCollectBus3", + "ServerTagName": "JiSheCollectBus", "Cassandra": { "ReplicationStrategy": { "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下 From 40b62caa83837b0dedf1375f2552565c7b69d0ea Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Sat, 19 Apr 2025 00:37:36 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=90=88=E5=B9=B6?= =?UTF-8?q?=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web/JiShe.CollectBus.Host/appsettings.json | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 585d7b7..9d1f7cb 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -84,12 +84,7 @@ "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, -<<<<<<< HEAD - "ServerTagName": "JiSheCollectBus99" -======= - "ServerTagName": "JiSheCollectBus3", - "FirstCollectionTime": "2025-04-18 00:00:00" ->>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172 + "ServerTagName": "JiSheCollectBus2" }, "IoTDBOptions": { "UserName": "root", @@ -100,10 +95,7 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, -<<<<<<< HEAD - "ServerTagName": "JiSheCollectBus", -======= ->>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172 + "ServerTagName": "JiSheCollectBus3", "Cassandra": { "ReplicationStrategy": { "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下 From 40924e921f1a67c09ca808994c03ca5165993450 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Sat, 19 Apr 2025 13:57:08 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BasicScheduledMeterReadingService.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 1ab07bc..1cc3c2b 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -27,11 +27,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using static FreeSql.Internal.GlobalFilter; -<<<<<<< HEAD -using JiShe.CollectBus.Kafka.Internal; -======= -using static System.Runtime.InteropServices.JavaScript.JSType; ->>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172 namespace JiShe.CollectBus.ScheduledMeterReading { From b3a2fd506ed904bc667d10f6e30fe63095067b40 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Sat, 19 Apr 2025 13:57:19 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JiShe.CollectBus.Application/Samples/SampleAppService.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index e5a7b81..61ba9ce 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -26,11 +26,6 @@ using System.Diagnostics; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; -<<<<<<< HEAD -using JiShe.CollectBus.Kafka.Internal; -======= -using JiShe.CollectBus.Common.Extensions; ->>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172 namespace JiShe.CollectBus.Samples; From f5f9434a973ceaced8a39890a3063c94e14fdb05 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Sat, 19 Apr 2025 14:00:13 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JiShe.CollectBus.Application/Samples/SampleAppService.cs | 2 ++ .../ScheduledMeterReading/BasicScheduledMeterReadingService.cs | 1 + 2 files changed, 3 insertions(+) diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 61ba9ce..f915fd6 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -26,6 +26,8 @@ using System.Diagnostics; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Common.Extensions; namespace JiShe.CollectBus.Samples; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 1cc3c2b..a7f9f3c 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -13,6 +13,7 @@ using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.RedisDataCache;