diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index d4e6591..fa3fd6c 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -47,6 +47,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services", EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -121,6 +123,10 @@ Global {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -143,6 +149,7 @@ Global {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} + {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs b/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs index 4c06e22..c7ad309 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs +++ b/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs @@ -3,6 +3,7 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Kafka.Attributes; +using JiShe.CollectBus.Kafka.Internal; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; @@ -16,53 +17,54 @@ namespace JiShe.CollectBus.Kafka.Test { public class KafkaSubscribeTest: IKafkaSubscribe { - [KafkaSubscribe(ProtocolConst.TESTTOPIC, EnableBatch=false,BatchSize=1000)] + [KafkaSubscribe(ProtocolConst.TESTTOPIC, EnableBatch = false, BatchSize = 10)] - public async Task KafkaSubscribeAsync(object obj) + public async Task KafkaSubscribeAsync(TestTopic obj) + //public async Task KafkaSubscribeAsync(IEnumerable obj) { Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(obj)}"); return SubscribeAck.Success(); } - [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] - public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + //public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] - public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + //public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] - public async Task ReceivedEvent(MessageReceived receivedMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] + //public async Task ReceivedEvent(MessageReceived receivedMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] - public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedHeartbeatMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + //public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedHeartbeatMessage)}"); + // return SubscribeAck.Success(); + //} - [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] - public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) - { - Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedLoginMessage)}"); - return SubscribeAck.Success(); - } + //[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + ////[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + //public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) + //{ + // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedLoginMessage)}"); + // return SubscribeAck.Success(); + //} } } diff --git a/modules/JiShe.CollectBus.Kafka.Test/Program.cs b/modules/JiShe.CollectBus.Kafka.Test/Program.cs index a359e14..016d0c4 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/Program.cs +++ b/modules/JiShe.CollectBus.Kafka.Test/Program.cs @@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Test; using Microsoft.AspNetCore.Builder; @@ -86,12 +87,13 @@ var logger = loggerFactory.CreateLogger(); logger.LogInformation("程序启动"); var adminClientService = host.Services.GetRequiredService(); var configuration = host.Services.GetRequiredService(); -string topic = "test-topic"; +string topic = ProtocolConst.TESTTOPIC; //await adminClientService.DeleteTopicAsync(topic); // 创建 topic //await adminClientService.CreateTopicAsync(topic, configuration.GetValue(CommonConst.NumPartitions), 3); var consumerService = host.Services.GetRequiredService(); +var producerService = host.Services.GetRequiredService(); //var kafkaOptions = host.Services.GetRequiredService>(); //await consumerService.SubscribeAsync(topic, (message) => //{ @@ -113,43 +115,49 @@ var consumerService = host.Services.GetRequiredService(); //for (int i = 0; i < 3; i++) //{ -// await consumerService.SubscribeBatchAsync(topic, (message) => +//await consumerService.SubscribeBatchAsync(topic, (message) => +//{ +// try // { -// try -// { -// int index = 0; -// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}:{JsonSerializer.Serialize(message)}"); -// return Task.FromResult(true); +// int index = 0; +// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}:{JsonSerializer.Serialize(message)}"); +// return Task.FromResult(true); -// } -// catch (ConsumeException ex) -// { -// // 处理消费错误 -// logger.LogError($"kafka消费异常:{ex.Message}"); -// } -// return Task.FromResult(false); -// }); +// } +// catch (ConsumeException ex) +// { +// // 处理消费错误 +// logger.LogError($"kafka消费异常:{ex.Message}"); +// } +// return Task.FromResult(false); +//}); //} //stopwatch.Stop(); //Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒"); -var producerService = host.Services.GetRequiredService(); -//int num = 840; -//while (num <= 900) + +int num = 1; +while (num <= 6) +{ + await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = num }); + num++; +} + +//int num = 2; +//while (num <= 4) //{ -// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); // await producerService.ProduceAsync(topic, num.ToString()); // num++; //} -await Task.Factory.StartNew(async() => { - int num = 0; - while (true) - { - //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); - await producerService.ProduceAsync(topic, num.ToString()); - num++; - } -}); +//await Task.Factory.StartNew(async() => { +// int num = 0; +// while (true) +// { +// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); +// await producerService.ProduceAsync(topic, num.ToString()); +// num++; +// } +//}); Console.WriteLine("\n按Esc键退出"); while (true) { diff --git a/modules/JiShe.CollectBus.Kafka.Test/appsettings.json b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json index b2579c6..9767dee 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/appsettings.json +++ b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json @@ -91,8 +91,8 @@ "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, - "NumPartitions": 1, - "ServerTagName": "JiSheCollectBus2" + "NumPartitions": 30, + "ServerTagName": "JiSheCollectBus99" //"Topic": { // "ReplicationFactor": 3, // "NumPartitions": 1000 diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs index c74aa2e..598caf0 100644 --- a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs +++ b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs @@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes /// /// 消费者组 /// - public string GroupId { get; set; } = "default"; + public string? GroupId { get; set; } = null;//"default" /// /// 任务数(默认是多少个分区多少个任务) diff --git a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs index b467162..5621b3d 100644 --- a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs +++ b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; @@ -43,10 +44,19 @@ namespace JiShe.CollectBus.Kafka //context.Services.AddHostedService(); } + /// + /// 在初始化之前,初始化Kafka Topic + /// + /// + public override void OnPreApplicationInitialization(ApplicationInitializationContext context) + { + var app = context.GetApplicationBuilder(); + app.ApplicationServices.UseInitKafkaTopic(); + } + public override void OnApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); - // 注册Subscriber app.ApplicationServices.UseKafkaSubscribe(); diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 0ec5bd0..5093561 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -1,5 +1,7 @@ using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Kafka.Serialization; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -47,11 +49,11 @@ namespace JiShe.CollectBus.Kafka.Consumer var config = new ConsumerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, - GroupId = groupId ?? "default", + GroupId = groupId ?? _kafkaOptionConfig.ServerTagName, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 - AllowAutoCreateTopics = true, // 启用自动创建 + //AllowAutoCreateTopics = true, // 启用自动创建 FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; @@ -252,7 +254,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费组ID /// 批次大小 /// 批次超时时间 - public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); } @@ -267,17 +269,17 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费组ID /// 批次大小 /// 批次超时时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - + //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + // ( + // CreateConsumer(groupId), + // cts + // )).Consumer as IConsumer; + var consumer = CreateConsumer(groupId); consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -300,8 +302,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (result.IsPartitionEOF) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(10, cts.Token); } else if (result.Message.Value != null) { @@ -330,7 +332,7 @@ namespace JiShe.CollectBus.Kafka.Consumer // 处理批次 if (messages.Count > 0) { - bool success = await messageBatchHandler(messages.Select(m => m.Value)); + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); if (success) { var offsetsByPartition = new Dictionary(); @@ -383,7 +385,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次大小 /// 批次超时时间 /// 消费等待时间 - public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class + public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); @@ -400,17 +402,18 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次大小 /// 批次超时时间 /// 消费等待时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class + public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; + //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + // ( + // CreateConsumer(groupId), + // cts + // )).Consumer as IConsumer; + var consumer= CreateConsumer (groupId); consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -434,8 +437,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (result.IsPartitionEOF) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(10, cts.Token); } else if (result.Message.Value != null) { @@ -464,7 +467,7 @@ namespace JiShe.CollectBus.Kafka.Consumer // 处理批次 if (messages.Count > 0) { - bool success = await messageBatchHandler(messages.Select(m => m.Value)); + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); if (success) { var offsetsByPartition = new Dictionary(); diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs index d86dba8..77901ef 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs @@ -33,13 +33,13 @@ namespace JiShe.CollectBus.Kafka.Consumer /// Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; void Unsubscribe() where TKey : notnull where TValue : class; } diff --git a/modules/JiShe.CollectBus.Kafka/HeadersFilter.cs b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs similarity index 94% rename from modules/JiShe.CollectBus.Kafka/HeadersFilter.cs rename to modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs index 0790f9f..7b479fa 100644 --- a/modules/JiShe.CollectBus.Kafka/HeadersFilter.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs @@ -5,7 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { /// /// 消息头过滤器 diff --git a/modules/JiShe.CollectBus.Kafka/IKafkaSubscribe.cs b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs similarity index 90% rename from modules/JiShe.CollectBus.Kafka/IKafkaSubscribe.cs rename to modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs index 39e5789..0ad450f 100644 --- a/modules/JiShe.CollectBus.Kafka/IKafkaSubscribe.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { /// /// Kafka订阅者 diff --git a/modules/JiShe.CollectBus.Kafka/ISubscribeAck.cs b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs similarity index 89% rename from modules/JiShe.CollectBus.Kafka/ISubscribeAck.cs rename to modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs index ffb30ef..ce24d69 100644 --- a/modules/JiShe.CollectBus.Kafka/ISubscribeAck.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { public interface ISubscribeAck { diff --git a/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs similarity index 89% rename from modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs rename to modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index e592ea2..a3fb126 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -5,7 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { public class KafkaOptionConfig { @@ -59,5 +59,10 @@ namespace JiShe.CollectBus.Kafka /// public string? SaslPassword { get; set; } + /// + /// 首次采集时间 + /// + public DateTime FirstCollectionTime { get; set; } + } } diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs new file mode 100644 index 0000000..ded11d7 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs @@ -0,0 +1,113 @@ +using Newtonsoft.Json; +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Internal +{ + /// + /// 反射辅助类 + /// + public static class ReflectionHelper + { + /// + ///集合类型 + ///Item1:参数类型 + ///Item2:集合元素类型 + /// + public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex=0) + { + // 参数校验 + if (method == null) throw new ArgumentNullException(nameof(method)); + var parameters = method.GetParameters(); + if (parameterIndex < 0 || parameterIndex >= parameters.Length) + throw new ArgumentOutOfRangeException(nameof(parameterIndex)); + + ParameterInfo param = parameters[parameterIndex]; + Type paramType = param.ParameterType; + Type? elementType = null; + + // 判断是否是集合类型(排除字符串) + if (paramType != typeof(string) && IsEnumerableType(paramType)) + { + elementType = GetEnumerableElementType(paramType); + } + + return Tuple.Create(paramType, elementType); + + } + + /// + /// 判断是否是集合类型(排除字符串) + /// + public static bool IsEnumerableType(this Type type) + { + return type.IsArray + || (type.IsGenericType && type.GetInterfaces() + .Any(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>))) + || type.GetInterfaces().Any(t => t == typeof(System.Collections.IEnumerable)); + } + + /// + /// 获取集合元素的类型 + /// + public static Type? GetEnumerableElementType(this Type type) + { + // 处理数组类型 + if (type.IsArray) + return type.GetElementType(); + + // 处理直接实现IEnumerable的类型(如IEnumerable本身) + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>)) + return type.GetGenericArguments()[0]; + + // 处理通过接口实现IEnumerable的泛型集合(如List) + var genericEnumerable = type.GetInterfaces() + .FirstOrDefault(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)); + if (genericEnumerable != null) + return genericEnumerable.GetGenericArguments()[0]; + + // 处理非泛型集合类型(如 ArrayList) + if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList)) + return typeof(ArrayList); + // 返回null表示无法确定元素类型 + return null; + } + + + // + /// 判断是否使用强转换 + /// + /// 目标类型 + /// + public static bool IsConvertType(this Type targetType) + { + // 处理可空类型 + Type underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType; + // 情况1:值类型或基元类型(如 int、DateTime) + if (underlyingType.IsValueType || underlyingType.IsPrimitive) + return true; + // 情况2:字符串类型直接赋值 + else if (underlyingType == typeof(string)) + return true; + + // 情况3:枚举类型处理 + //else if (underlyingType.IsEnum) + //{ + // if (Enum.IsDefined(underlyingType, msg)) + // { + // convertedValue = Enum.Parse(underlyingType, msg.ToString()); + // return true; + // } + // return false; + //} + return false; + } + } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/SubscribeResult.cs b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs similarity index 97% rename from modules/JiShe.CollectBus.Kafka/SubscribeResult.cs rename to modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs index 83eaa49..0c016ff 100644 --- a/modules/JiShe.CollectBus.Kafka/SubscribeResult.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs @@ -6,7 +6,7 @@ using System.Text; using System.Threading.Tasks; using static System.Runtime.InteropServices.JavaScript.JSType; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Internal { public class SubscribeResult: ISubscribeAck { diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs index ff60130..a5bf39c 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs @@ -5,30 +5,33 @@ using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Kafka.Serialization; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Mvc.Abstractions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Newtonsoft.Json; +using System; +using System.Collections; using System.Collections.Generic; +using System.ComponentModel; +using System.Linq.Expressions; using System.Reflection; +using System.Text.Json; using System.Threading.Tasks; +using YamlDotNet.Core.Tokens; +using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.Kafka { public static class KafkaSubcribesExtensions { - /// - /// 添加Kafka订阅 - /// - /// - /// - public static void UseKafkaSubscribe(this IServiceProvider provider) - { - var lifetime = provider.GetRequiredService(); + public static void UseInitKafkaTopic(this IServiceProvider provider) + { //初始化主题信息 var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); @@ -40,6 +43,17 @@ namespace JiShe.CollectBus.Kafka { kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } + } + + /// + /// 添加Kafka订阅 + /// + /// + /// + public static void UseKafkaSubscribe(this IServiceProvider provider) + { + var lifetime = provider.GetRequiredService(); + var kafkaOptions = provider.GetRequiredService>(); lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); @@ -88,17 +102,7 @@ namespace JiShe.CollectBus.Kafka { var provider = app.ApplicationServices; var lifetime = provider.GetRequiredService(); - //初始化主题信息 - var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); - - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); - topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - - foreach (var item in topics) - { - kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); - } lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); @@ -132,7 +136,7 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) + private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) @@ -140,14 +144,14 @@ namespace JiShe.CollectBus.Kafka .ToArray(); //var configuration = provider.GetRequiredService(); int threadCount = 0; - + foreach (var sub in subscribedMethods) { int partitionCount = 3;// kafkaOptionConfig.NumPartitions; #if DEBUG var adminClientService = provider.GetRequiredService(); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); - partitionCount= partitionCount> topicCount ? topicCount: partitionCount; + partitionCount = partitionCount > topicCount ? topicCount : partitionCount; #endif //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) @@ -170,18 +174,18 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger) + private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); if (attr.EnableBatch) { - await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => + await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { try { #if DEBUG - logger.LogInformation($"kafka批量消费消息:{message}"); + logger.LogInformation($"kafka批量消费消息:{message.Serialize()}"); #endif // 处理消息 return await ProcessMessageAsync(message.ToList(), method, subscribe); @@ -196,7 +200,7 @@ namespace JiShe.CollectBus.Kafka } else { - await consumerService.SubscribeAsync(attr.Topic, async (message) => + await consumerService.SubscribeAsync(attr.Topic, async (message) => { try { @@ -225,26 +229,112 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) + private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); bool isGenericTask = method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); bool existParameters = parameters.Length > 0; - List? messageObj = null; + object[]? executeParameters = null; + if (existParameters) { - messageObj = new List(); - var paramType = parameters[0].ParameterType; - foreach (var msg in messages) + IList? list = null; + Tuple tuple = method.GetParameterTypeInfo(); + bool isEnumerable = false; + if (tuple.Item2 != null) { - var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg; - if (data != null) - messageObj.Add(data); + Type listType = typeof(List<>).MakeGenericType(tuple.Item2); + list = (IList)Activator.CreateInstance(listType)!; + isEnumerable = tuple.Item2.IsConvertType(); + } + else + { + isEnumerable = tuple.Item1.IsConvertType(); + } + #region 暂时 + //foreach (var msg in messages) + //{ + // if (tuple.Item2 != null) + // { + // if (isEnumerable) + // { + // var parameterType = parameters[0].ParameterType; + // var data=messages?.Serialize().Deserialize(parameterType); + // messageObj = data!=null? new[] { data }:null; + // break; + // } + // else + // { + // // 集合类型 + // var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/; + // if (data != null) + // list?.Add(data); + // } + + // } + // else + // { + // // (dynamic)Convert.ChangeType(msg, tuple.Item1) + // using (var stream = new MemoryStream(msg)) + // { + // var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1); + // } + // var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1); + // if (data != null) + // messageObj = new[] { data }; + // } + //} + //if (tuple.Item2 != null && list != null && list.Count > 0) + //{ + // messageObj = new[] { list }; + //} + #endregion + var parameterDescriptors = method.GetParameters(); + executeParameters = new object?[parameterDescriptors.Length]; + for (var i = 0; i < parameterDescriptors.Length; i++) + { + foreach (var item in messages) + { + + object? tempParameter=null; + var parameterDescriptor = parameterDescriptors[i]; + if (KafkaSerialization.IsJsonType(item)) + { + tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType); + } + else + { + + var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType); + if (converter.CanConvertFrom(item.GetType())) + { + tempParameter = converter.ConvertFrom(item); + } + else + { + if (parameterDescriptor.ParameterType.IsInstanceOfType(item)) + tempParameter = item; + else + tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType); + } + } + if (tuple.Item2 == null) + { + executeParameters[i] = tempParameter; + } + else + { + list.Add(tempParameter); + } + + } + if(list!=null && list.Count>0) + executeParameters[i] = list; } } - var result = method.Invoke(subscribe, messageObj?.ToArray()); + var result = method.Invoke(subscribe, executeParameters); if (result is Task genericTask) { await genericTask.ConfigureAwait(false); @@ -261,6 +351,10 @@ namespace JiShe.CollectBus.Kafka } return false; } - + + + } + + } diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index 529d293..db0efd8 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -6,6 +6,8 @@ using System.Text; using System.Threading.Tasks; using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Kafka.Serialization; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -56,7 +58,7 @@ namespace JiShe.CollectBus.Kafka.Producer var config = new ProducerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, - AllowAutoCreateTopics = true, + //AllowAutoCreateTopics = true, QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32_768, // 修改批次大小为32K @@ -108,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Producer var message = new Message { Key = key, - Value = value, + Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } diff --git a/modules/JiShe.CollectBus.Kafka/JsonSerializer.cs b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs similarity index 61% rename from modules/JiShe.CollectBus.Kafka/JsonSerializer.cs rename to modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs index 83f58a3..8034954 100644 --- a/modules/JiShe.CollectBus.Kafka/JsonSerializer.cs +++ b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs @@ -8,7 +8,7 @@ using Confluent.Kafka; using System.Text.Json.Serialization; using System.Text.Encodings.Web; -namespace JiShe.CollectBus.Kafka +namespace JiShe.CollectBus.Kafka.Serialization { /// /// JSON 序列化器(支持泛型) @@ -49,10 +49,11 @@ namespace JiShe.CollectBus.Kafka { if (isNull) return default; - try { - return JsonSerializer.Deserialize(data, _options); + if (data.IsEmpty) + return default; + return JsonSerializer.Deserialize(data, _options)!; } catch (Exception ex) { @@ -85,4 +86,40 @@ namespace JiShe.CollectBus.Kafka writer.WriteStringValue(value.ToString(_dateFormatString)); } } + + + public static class KafkaSerialization + { + + /// + /// 判断是否是json类型 + /// + /// + /// + public static bool IsJsonType(this object jsonObject) + { + return jsonObject is JsonElement; + } + public static object? Deserialize(object value, Type valueType) + { + var _jsonSerializerOptions = new JsonSerializerOptions + { + DefaultIgnoreCondition = JsonIgnoreCondition.Never, + WriteIndented = false,// 设置格式化输出 + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 + IgnoreReadOnlyFields = true, + IgnoreReadOnlyProperties = true, + NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串 + AllowTrailingCommas = true, // 忽略尾随逗号 + ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 + PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 + Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, + }; + + if (value is JsonElement jsonElement) return jsonElement.Deserialize(valueType, _jsonSerializerOptions); + + throw new NotSupportedException("Type is not of type JsonElement"); + } + } } diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs index 658ff29..bdc6b78 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs @@ -1,17 +1,18 @@ -using System.Threading.Tasks; +using System.Collections.Generic; +using System.Threading.Tasks; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; -using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Internal; using Volo.Abp.Application.Services; namespace JiShe.CollectBus.Subscribers { public interface ISubscriberAppService : IApplicationService { - Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage); - Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage); + Task LoginIssuedEvent(List issuedEventMessage); + Task HeartbeatIssuedEvent(List issuedEventMessage); Task ReceivedEvent(MessageReceived receivedMessage); - Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage); - Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage); + Task ReceivedHeartbeatEvent(List receivedHeartbeatMessage); + Task ReceivedLoginEvent(List receivedLoginMessage); } } diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index 9a37167..f0a79cf 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -1,7 +1,7 @@ using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Internal; using System.Collections.Generic; using System.Threading.Tasks; using Volo.Abp.Application.Services; diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index c7d7ebf..1874b25 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -25,6 +25,7 @@ using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.EventBus; using Volo.Abp.Modularity; using Microsoft.Extensions.Options; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Common.Attributes; @@ -85,7 +86,7 @@ public class CollectBusApplicationModule : AbpModule //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); - //await dbContext.InitAmmeterCacheData(); + await dbContext.InitAmmeterCacheData(); //await dbContext.InitWatermeterCacheData(); //初始化主题信息 diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index f0aa541..f915fd6 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -20,13 +20,14 @@ using System.Diagnostics.Metrics; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Kafka.Attributes; using System.Text.Json; -using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.Models; using System.Diagnostics; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Common.Extensions; namespace JiShe.CollectBus.Samples; @@ -243,6 +244,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS } + /// + /// 下一个采集时间点验证 + /// + /// + [HttpGet] + public async Task TestCalculateNextCollectionTime(string time, int timeDensity) + { + DateTime nextTaskTime = Convert.ToDateTime(time); + + return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity)); + } + + public Task GetAsync() { return Task.FromResult( diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 18f6780..a7f9f3c 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Ammeters; +using DnsClient.Protocol; +using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; @@ -8,10 +9,11 @@ using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; -using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.RedisDataCache; @@ -25,7 +27,6 @@ using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; -using JiShe.CollectBus.IoTDB.Interface; using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.ScheduledMeterReading @@ -101,6 +102,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } + var currentTime = DateTime.Now; + foreach (var item in taskInfos) { var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item); @@ -130,70 +133,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - var timer = Stopwatch.StartNew(); + //_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}"); - //获取对应频率中的所有电表信息 - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); - - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); - - if (meterInfos == null || meterInfos.Count <= 0) - { - timer.Stop(); - _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); - return; - } - //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); - - - //处理数据 - //await DeviceGroupBalanceControl.ProcessGenericListAsync( - // items: meterInfos, - // deviceIdSelector: data => data.FocusAddress, - // processor: (data, groupIndex) => - // { - // _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); - // } - //); - - - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => + _ = CreateMeterPublishTask( + timeDensity: timeDensity, + taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + meterType: MeterTypeEnum.Ammeter, + taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => { - AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); - } - ); - - timer.Stop(); - _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + }); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { //todo 水表任务创建待处理 //await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos); + + _ = CreateMeterPublishTask( + timeDensity: timeDensity, + taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + meterType: MeterTypeEnum.Ammeter, + taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => + { + //AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + }); } else { @@ -205,7 +169,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 - tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity); + tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } } @@ -229,24 +193,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { -#if DEBUG - //var timeDensity = "15"; - //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; - ////获取缓存中的电表信息 - //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; - - //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); - ////List focusAddressDataLista = new List(); - //List meterInfos = new List(); - //foreach (var item in tempMeterInfos) - //{ - // var tempData = item.Adapt(); - // tempData.FocusId = item.FocusID; - // tempData.MeterId = item.Id; - // meterInfos.Add(tempData); - // //focusAddressDataLista.Add(item.FocusAddress); - //} +#if DEBUG + return; @@ -258,23 +206,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading List meterInfos = new List(); List focusAddressDataLista = new List(); var timer1 = Stopwatch.StartNew(); - //decimal? cursor = null; - //string member = null; - //bool hasNext; - //do - //{ - // var page = await _redisDataCacheService.GetAllPagedDataOptimized( - // redisCacheMeterInfoHashKeyTemp, - // redisCacheMeterInfoZSetScoresIndexKeyTemp, - // pageSize: 1000, - // lastScore: cursor, - // lastMember: member); - - // meterInfos.AddRange(page.Items); - // cursor = page.HasNext ? page.NextScore : null; - // member = page.HasNext ? page.NextMember : null; - // hasNext = page.HasNext; - //} while (hasNext); var allIds = new HashSet(); decimal? score = null; @@ -306,8 +237,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - return; + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + //return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -329,6 +260,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + + //先处理采集频率任务缓存 + foreach (var item in meterInfoGroupByTimeDensity) + { + TasksToBeIssueModel nextTask = new TasksToBeIssueModel() + { + TimeDensity = item.Key, + NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + }; + + //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 + + var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); + await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); + } + foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; @@ -403,25 +350,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading } ammeterInfos.Add(ammeter); - //keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter); } - //await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } await _redisDataCacheService.BatchInsertDataAsync( redisCacheMeterInfoHashKey, redisCacheMeterInfoSetIndexKey, redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos); - - //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = itemTimeDensity.Key, - NextTaskTime = DateTime.Now.AddMinutes(1) - }; - - var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key); - await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } //初始化设备组负载控制 @@ -446,63 +381,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task AmmeterScheduledMeterOneMinuteReading() { + //获取缓存中的电表信息 - int timeDensity = 1; + int timeDensity = 5; var currentTime = DateTime.Now; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + // 自动计算最佳并发度 + int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); + + var options = new ParallelOptions { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } + MaxDegreeOfParallelism = recommendedThreads, + }; + var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - List meterTaskInfosList = new List(); + _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + }); - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - //_= _producerBus.Publish(tempMsg); - - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - //_dbProvider.SwitchSessionPool(true); - //await _dbProvider.InsertAsync(meterTaskInfosList); - - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); - //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); - - - _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成"); + await Task.CompletedTask; } @@ -516,57 +417,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading int timeDensity = 5; var currentTime = DateTime.Now; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) + // 自动计算最佳并发度 + int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); + + var options = new ParallelOptions { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } + MaxDegreeOfParallelism = recommendedThreads, + }; + var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); - - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); - - _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成"); + _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + }); } /// @@ -575,12 +441,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); - //获取缓存中的电表信息 int timeDensity = 15; - var currentDateTime = DateTime.Now; + var currentTime = DateTime.Now; // 自动计算最佳并发度 int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); @@ -589,107 +452,84 @@ namespace JiShe.CollectBus.ScheduledMeterReading { MaxDegreeOfParallelism = recommendedThreads, }; - string taskBatch = "20250417155016"; + var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { - Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - pageSize: 1000, - lastScore: cursor, - lastMember: member); - - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - _= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex); - } - ); - - } while (hasNext); + _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); }); - - - //var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - //var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - //if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) - //{ - // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - // return; - //} - - ////获取下发任务缓存数据 - //Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - //if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - //{ - // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - // return; - //} - - //List meterTaskInfosList = new List(); - - ////将取出的缓存任务数据发送到Kafka消息队列中 - //foreach (var focusItem in meterTaskInfos) - //{ - // foreach (var ammerterItem in focusItem.Value) - // { - // var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - // { - // MessageHexString = ammerterItem.Value.IssuedMessageHexString, - // MessageId = ammerterItem.Value.IssuedMessageId, - // FocusAddress = ammerterItem.Value.FocusAddress, - // TimeDensity = timeDensity.ToString(), - // }; - // //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - // _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - // //_ = _producerBus.Publish(tempMsg); - - // meterTaskInfosList.Add(ammerterItem.Value); - // } - //} - //if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - //{ - // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); - //} - - - //stopwatch.Stop(); - - //_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } - /// - /// 电表创建发布任务 + /// 创建电表待发送的任务数据 /// /// 采集频率 - /// 集中器号hash分组的集中器集合数据 + /// 时间格式的任务批次名称 + /// + private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) + { + var timer = Stopwatch.StartNew(); + + //获取对应频率中的所有电表信息 + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); + + if (meterInfos == null || meterInfos.Count <= 0) + { + timer.Stop(); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + return; + } + + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + } + ); + + timer.Stop(); + _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + } + + + /// + /// 创建电表待发送的任务数据 + /// + /// 采集频率 + /// 电表信息 /// 集中器所在分组 /// 时间格式的任务批次名称 /// - private void AmmerterCreatePublishTask(int timeDensity + private void AmmerterCreatePublishTaskAction(int timeDensity , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; @@ -697,7 +537,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var currentTime = DateTime.Now; var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; @@ -781,7 +621,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } - //Dictionary keyValuePairs = new Dictionary(); List taskList = new List(); foreach (var tempItem in tempCodes) @@ -801,11 +640,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading var aFNStr = itemCodeArr[0]; var aFN = (AFN)aFNStr.HexToDec(); var fn = int.Parse(itemCodeArr[1]); - byte[] dataInfos = null; + TelemetryPacketResponse builderResponse = null; if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) { //实时数据 - dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn); + builderResponse = TelemetryPacketBuilder.AFN0C_Fn_Send(new TelemetryPacketRequest() + { + FocusAddress = ammeterInfo.FocusAddress, + Fn = fn, + Pn = ammeterInfo.MeteringCode + }); } else { @@ -814,7 +658,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode , out var handler)) { - dataInfos = handler(new TelemetryPacketRequest() + builderResponse = handler(new TelemetryPacketRequest() { FocusAddress = ammeterInfo.FocusAddress, Fn = fn, @@ -829,7 +673,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //TODO:特殊表 - if (dataInfos == null || dataInfos.Length <= 0) + if (builderResponse == null || builderResponse.Data.Length <= 0) { //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); continue; @@ -850,36 +694,28 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusId = ammeterInfo.FocusId, AFN = aFN, Fn = fn, + Seq = builderResponse.Seq, + MSA = builderResponse.MSA, ItemCode = tempItem, - TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode), + TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA), + IsSend = false, ManualOrNot = false, Pn = ammeterInfo.MeteringCode, IssuedMessageId = GuidGenerator.Create().ToString(), - IssuedMessageHexString = Convert.ToHexString(dataInfos), + IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), + IsReceived = false, }; - //meterReadingRecords.CreateDataId(GuidGenerator.Create()); - taskList.Add(meterReadingRecords); } - //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); - //await Task.Delay(timeSpan); - //return keyValuePairs; - // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); - - //using (var pipe = FreeRedisProvider.Instance.StartPipe()) - //{ - // pipe.HSet(redisCacheKey, keyValuePairs); - // object[] ret = pipe.EndPipe(); - //} if (taskList == null || taskList.Count() <= 0 || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); return; } @@ -906,76 +742,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading // taskList); } - /// - /// Kafka 推送消息 - /// - /// 主题名称 - /// 任务记录 - /// 对应分区,也就是集中器号所在的分组序号 - /// - private async Task KafkaProducerIssuedMessage(string topicName, - MeterReadingTelemetryPacketInfo taskRecord,int partition) - { - if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) - { - throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); - } - - await _producerService.ProduceAsync(topicName, partition, taskRecord); - } - - private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) - { - var currentDateTime = DateTime.Now; - - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType); - - //FreeRedisProvider.Instance.key() - - var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } - - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } - - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); - } - } - #endregion @@ -1044,6 +810,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } + + _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成"); } @@ -1109,12 +877,58 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } + /// + /// 创建水表待发送的任务数据 + /// + /// 采集频率 + /// 水表信息 + /// 集中器所在分组 + /// 时间格式的任务批次名称 + /// + private void WatermeterCreatePublishTaskAction(int timeDensity + , WatermeterInfo meterInfo, int groupIndex, string taskBatch) + { + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + + + var currentTime = DateTime.Now; + var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + + + var taskInfo = new MeterReadingTelemetryPacketInfo() + { + Seq= null, + + }; + // + + Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); + + using (var pipe = FreeRedisProvider.Instance.StartPipe()) + { + // 主数据存储Hash + pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); + + // Set索引缓存 + pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); + + // ZSET索引缓存Key + pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); + + pipe.EndPipe(); + } + + } + #endregion #region 公共处理方法 - /// /// 判断是否需要生成采集指令 /// @@ -1131,39 +945,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading return false; } - ///// - ///// 指定时间对比当前时间 - ///// - ///// - ///// - ///// - //private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) - //{ - // if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 - // return false; - // return true; - //} - - ///// - ///// 缓存下一个时间的任务 - ///// - ///// 采集频率 - ///// 表类型 - ///// - //private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType) - //{ - // //缓存下一个时间的任务 - // TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - // { - // TimeDensity = timeDensity, - // NextTask = DateTime.Now.AddMinutes(timeDensity) - // }; - - // var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity); - // await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); - //} - - /// /// 获取缓存表计下发指令缓存key前缀 /// @@ -1175,6 +956,130 @@ namespace JiShe.CollectBus.ScheduledMeterReading return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*"; } + + /// + /// 创建表的待发送的任务数据 + /// + /// 采集频率 + /// 时间格式的任务批次名称 + /// 表类型 + /// 具体的创建任务的委托 + /// + private async Task CreateMeterPublishTask(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel + { + var timer = Stopwatch.StartNew(); + + //获取对应频率中的所有电表信息 + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}"; + + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); + + if (meterInfos == null || meterInfos.Count <= 0) + { + timer.Stop(); + _logger.LogError($"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + return; + } + + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + taskCreateAction(timeDensity, data, groupIndex, taskBatch); + } + ); + + timer.Stop(); + _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + } + + + /// + /// 创建Kafka消息 + /// + /// + /// + /// + private async Task CreateMeterKafkaTaskMessage( + string redisCacheTelemetryPacketInfoHashKey, + string redisCacheTelemetryPacketInfoZSetScoresIndexKey) + { + if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)) + { + throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101"); + } + + decimal? cursor = null; + string member = null; + bool hasNext; + var stopwatch = Stopwatch.StartNew(); + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: page.Items, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + } + ); + + } while (hasNext); + + stopwatch.Stop(); + _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + } + + /// + /// Kafka 推送消息 + /// + /// 主题名称 + /// 任务记录 + /// 对应分区,也就是集中器号所在的分组序号 + /// + private async Task KafkaProducerIssuedMessageAction(string topicName, + MeterReadingTelemetryPacketInfo taskRecord, int partition) + { + if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) + { + throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); + } + + await _producerService.ProduceAsync(topicName, partition, taskRecord); + } + #endregion } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index dfbcd26..fe398a5 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -13,7 +13,7 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; -using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index e10d99b..2ac8801 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -5,7 +5,6 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; @@ -19,6 +18,8 @@ using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Interface; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; +using System.Collections.Generic; +using JiShe.CollectBus.Kafka.Internal; namespace JiShe.CollectBus.Subscribers { @@ -58,67 +59,75 @@ namespace JiShe.CollectBus.Subscribers _dbProvider = dbProvider; } - [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] - public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) + public async Task LoginIssuedEvent(List issuedEventMessages) { bool isAck = false; - switch (issuedEventMessage.Type) + foreach (var issuedEventMessage in issuedEventMessages) { - case IssuedEventType.Heartbeat: - break; - case IssuedEventType.Login: - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); - var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); - loginEntity.AckTime = Clock.Now; - loginEntity.IsAck = true; - await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - isAck = true; - break; - case IssuedEventType.Data: - break; - default: - throw new ArgumentOutOfRangeException(); + switch (issuedEventMessage.Type) + { + case IssuedEventType.Heartbeat: + break; + case IssuedEventType.Login: + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); + var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + loginEntity.AckTime = Clock.Now; + loginEntity.IsAck = true; + await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); + isAck = true; + break; + case IssuedEventType.Data: + break; + default: + throw new ArgumentOutOfRangeException(); + } + + //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); + //if (device != null) + //{ + // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); + //} + + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } - - //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - //if (device != null) - //{ - // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); - //} - - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); } - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] - public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) + public async Task HeartbeatIssuedEvent(List issuedEventMessages) { bool isAck = false; - switch (issuedEventMessage.Type) + foreach (var issuedEventMessage in issuedEventMessages) { - case IssuedEventType.Heartbeat: - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); - var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); - heartbeatEntity.AckTime = Clock.Now; - heartbeatEntity.IsAck = true; - await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - isAck = true; - break; - case IssuedEventType.Data: - break; - default: - throw new ArgumentOutOfRangeException(); + switch (issuedEventMessage.Type) + { + case IssuedEventType.Heartbeat: + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); + var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + heartbeatEntity.AckTime = Clock.Now; + heartbeatEntity.IsAck = true; + await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); + isAck = true; + break; + case IssuedEventType.Data: + break; + default: + throw new ArgumentOutOfRangeException(); + } + + //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); + //if (device != null) + //{ + // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); + //} + + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } - - //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - //if (device != null) - //{ - // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); - //} - - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } @@ -179,37 +188,44 @@ namespace JiShe.CollectBus.Subscribers return SubscribeAck.Success(); } - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] - public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) + public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) + foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) { - _logger.LogError("协议不存在!"); - } - else - { - await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); - await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + else + { + await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); + await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); + } } return SubscribeAck.Success(); } - [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)] //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] - public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) + public async Task ReceivedLoginEvent(List receivedLoginMessages) { - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) + foreach (var receivedLoginMessage in receivedLoginMessages) { - _logger.LogError("协议不存在!"); - } - else - { - await protocolPlugin.LoginAsync(receivedLoginMessage); - await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + else + { + await protocolPlugin.LoginAsync(receivedLoginMessage); + await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); + } } + return SubscribeAck.Success(); } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index b3102d0..fa55fe7 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -8,8 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Attributes; +using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Repository.MeterReadingRecord; diff --git a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index b03193a..654abc4 100644 --- a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.Common.Consts; @@ -27,13 +28,15 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(CreateToBeIssueTaskWorker); - CronExpression = "* 10 * * * *"; + CronExpression = "0 0/1 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { + _logger.LogError($"{DateTime.Now}"); // await _scheduledMeterReadingService.CreateToBeIssueTasks(); } } diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs index 441b22a..005d46b 100644 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.ScheduledMeterReading; @@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFifteenMinuteWorker); - CronExpression = "* 15 * * * *"; + CronExpression = "0 0/15 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs index 0a61c63..fbd3668 100644 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.ScheduledMeterReading; @@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFiveMinuteWorker); - CronExpression = "* 5 * * * *"; + CronExpression = "0 0/5 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs index 8b7cbfd..e9e0240 100644 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.ScheduledMeterReading; @@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberOneMinuteWorker); - CronExpression = "* 1 * * * *"; + CronExpression = "0 0/1 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index 8b082bb..c07950f 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -26,12 +26,7 @@ namespace JiShe.CollectBus.Ammeters /// 电表名称 /// public string Name { get; set; } - - /// - /// 集中器地址 - /// - public string FocusAddress { get; set; } - + /// /// 集中器地址 /// diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index c3f75d3..3aafa41 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -34,7 +34,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 任务数据唯一标记 /// - public string TaskMark { get; set; } + public decimal TaskMark { get; set; } /// /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。 @@ -96,7 +96,21 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// 采集项编码 /// public string ItemCode { get; set;} - + + /// + /// 帧序列域SEQ + /// + public required Seq Seq { get; set; } + + /// + /// 地址域A3的主站地址MSA + /// + public int MSA { get; set; } + + /// + /// 是否发送 + /// + public bool IsSend { get; set; } /// /// 创建时间 @@ -132,6 +146,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// 上报报文解析备注,异常情况下才有 /// public string ReceivedRemark { get; set; } + + /// + /// 是否已上报 + /// + public bool IsReceived { get; set; } //public void CreateDataId(Guid Id) //{ diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs index 966192b..be97769 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Common.Enums; +using FreeSql.DataAnnotations; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using System; using System.Collections.Generic; @@ -12,7 +13,19 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// 水表信息 /// public class WatermeterInfo: DeviceCacheBasicModel - { + { + /// + /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// + [Column(IsIgnore = true)] + public override string MemberId => $"{FocusId}:{MeterId}"; + + /// + /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 + /// + [Column(IsIgnore = true)] + public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; + /// /// 水表名称 /// @@ -21,11 +34,6 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// 表密码 /// public string Password { get; set; } - - /// - /// 集中器地址 - /// - public string FocusAddress { get; set; } /// /// 一个集中器下的[MeteringCode]必须唯一。 PN diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs index 8ad2a39..5db0dd7 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs @@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// /// /// - public delegate byte[] AFNDelegate(TelemetryPacketRequest request); + public delegate TelemetryPacketResponse AFNDelegate(TelemetryPacketRequest request); /// /// 编码与方法的映射表 @@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas } #region AFN_00H 确认∕否认 - public static byte[] AFN00_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN00_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -64,13 +64,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_01H 复位命令 - public static byte[] AFN01_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN01_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -89,13 +89,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_02H 链路接口检测 - public static byte[] AFN02_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN02_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -114,12 +114,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_04H 设置参数 - public static byte[] AFN04_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN04_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -138,13 +138,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_05H 控制命令 - public static byte[] AFN05_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN05_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -163,12 +163,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_09H 请求终端配置及信息 - public static byte[] AFN09_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN09_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -187,13 +187,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0AH 查询参数 - public static byte[] AFN0A_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN0A_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -212,12 +212,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0CH 请求一类数据 - public static byte[] AFN0C_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN0C_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -236,12 +236,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0DH 请求二类数据 - public static byte[] AFN0D_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN0D_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -260,12 +260,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN10H 数据转发 - public static byte[] AFN10_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN10_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -283,8 +283,8 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Pn = request.Pn, Fn = request.Fn }; - var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter,request.DataUnit); - return bytes; + var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit); + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #region SpecialAmmeter 特殊电表转发 diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs new file mode 100644 index 0000000..8cd964a --- /dev/null +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs @@ -0,0 +1,30 @@ +using JiShe.CollectBus.Common.Models; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.BuildSendDatas +{ + /// + /// 报文构建返回结果 + /// + public class TelemetryPacketResponse + { + /// + /// 帧序列域SEQ + /// + public required Seq Seq { get; set; } + + /// + /// 地址域A3的主站地址MSA + /// + public int MSA { get; set; } + + /// + /// 报文体 + /// + public required byte[] Data { get; set; } + } +} diff --git a/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs b/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs index 471c897..c3f1e9a 100644 --- a/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs @@ -11,25 +11,31 @@ namespace JiShe.CollectBus.Common.Consts /// public class CommonConst { - /// - /// 服务器标识 - /// - public const string ServerTagName = "ServerTagName"; /// /// Kafka /// public const string Kafka = "Kafka"; + /// + /// 服务器标识 + /// + public const string ServerTagName = $"{Kafka}:ServerTagName"; + /// /// Kafka副本数量 /// - public const string KafkaReplicationFactor = "KafkaReplicationFactor"; + public const string KafkaReplicationFactor = $"{Kafka}:KafkaReplicationFactor"; /// /// Kafka主题分区数量 /// - public const string NumPartitions = "NumPartitions"; + public const string NumPartitions = $"{Kafka}:NumPartitions"; + + /// + /// 首次采集时间 + /// + public const string FirstCollectionTime = "FirstCollectionTime"; } } diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 4e3fef9..e6136df 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -200,5 +200,38 @@ namespace JiShe.CollectBus.Common.Extensions { return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; } + + /// + /// 采集时间节点计算 + /// + /// 待采集时间 + /// + /// + public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval) + { + // 计算精确到分钟的基准时间 + var baseTime = new DateTime( + referenceTime.Year, + referenceTime.Month, + referenceTime.Day, + referenceTime.Hour, + referenceTime.Minute, + 0); + + // 计算总分钟数和下一个间隔点 + int totalMinutes = baseTime.Hour * 60 + baseTime.Minute; + int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval; + + // 处理跨天情况 + int daysToAdd = nextTotalMinutes / (24 * 60); + int remainingMinutes = nextTotalMinutes % (24 * 60); + int hours = remainingMinutes / 60; + int minutes = remainingMinutes % 60; + + return baseTime.Date + .AddDays(daysToAdd) + .AddHours(hours) + .AddMinutes(minutes); + } } } diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 2474b88..3c36d23 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -669,7 +669,7 @@ namespace JiShe.CollectBus.Common.Helpers return att == null ? field.Name : ((DescriptionAttribute)att).Description; } - + /// /// 将传入的字符串中间部分字符替换成特殊字符 @@ -759,7 +759,7 @@ namespace JiShe.CollectBus.Common.Helpers } return fontValue; - } + } /// /// 获取任务标识 @@ -767,10 +767,13 @@ namespace JiShe.CollectBus.Common.Helpers /// /// /// + /// /// - public static string GetTaskMark(int afn,int fn,int pn) + public static decimal GetTaskMark(int afn, int fn, int pn, int msa) { - return $"{afn.ToString().PadLeft(2,'0')}{fn}{pn}"; + var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}"; + + return Convert.ToInt32(makstr) << 32 | msa; } } } diff --git a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs index d397151..1edc46a 100644 --- a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs +++ b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs @@ -30,5 +30,15 @@ namespace JiShe.CollectBus.Common.Models /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 /// public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId; + + /// + /// 是否已处理 + /// + public virtual bool IsHandle { get; set; } = false; + + /// + /// 集中器地址 + /// + public string FocusAddress { get; set;} } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index aaadf3f..afe25da 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +