From ee0b48afbdd6851d65a58628b662aca4bf7abafd Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 14 May 2025 15:21:33 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...nergySystemScheduledMeterReadingService.cs | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 0521dca..05c3007 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -125,6 +125,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading BrandType = "DDS1980", MeterType = MeterTypeEnum.Ammeter, ProjectID = 1, + MeteringPort = 2, }); ammeterInfos.Add(new DeviceInfo() @@ -143,6 +144,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading BrandType = "DDS1980", MeterType = MeterTypeEnum.Ammeter, ProjectID = 1, + MeteringPort = 2, }); //ammeterInfos.Add(new DeviceInfo() @@ -286,12 +288,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusId = 95780, ProjectID = 1, TripType = "off", - TripTime = "14:02", + TripTime = $"{DateTime.Now:HH:mm}", MeterId = 127035, LoopType = "EachDay", EachDayWithout = "周六,周日", TimeDensity = 15, }); + settingInfos.Add(new AmmeterAutoValveControlSetting() + { + MeterType = MeterTypeEnum.Ammeter, + AmmerterAddress = "442405000039", + FocusAddress = "442400039", + FocusId = 69280, + ProjectID = 1, + TripType = "off", + TripTime = $"{DateTime.Now:HH:mm}", + MeterId = 95594, + LoopType = "EachDay", + EachDayWithout = "周六,周日", + TimeDensity = 15, + }); #else //获取电表阀控配置 var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr); @@ -302,7 +318,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } #endif - + //设备hash缓存key string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}"; From be3cd5d3e7cbdb3fc9e0ceb85066e21b5c2f3b1f Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 14 May 2025 17:41:39 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E6=8D=95=E6=8D=89=EF=BC=8C=E4=BF=AE=E5=A4=8DIoTDBEntity?= =?UTF-8?q?=E5=9F=BA=E7=B1=BB=E5=B1=9E=E6=80=A7=E5=BC=82=E5=B8=B8=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JiShe.CollectBus.IoTDB/Model/IoTEntity.cs | 24 +- .../Options/QueryCondition.cs | 1 - .../Consumer/ConsumerService.cs | 683 ++++++++++-------- .../KafkaSubscribeExtensions.cs | 232 +++--- .../Producer/ProducerService.cs | 118 +-- .../Serialization/JsonSerializer.cs | 46 +- .../SendData/Telemetry1882018PacketBuilder.cs | 4 +- .../T1882018ProtocolPlugin.cs | 4 +- .../SendData/Telemetry6452007PacketBuilder.cs | 4 +- .../T6452007ProtocolPlugin.cs | 4 +- ...nergySystemScheduledMeterReadingService.cs | 18 +- .../Consts/T6452007PacketItemCodeConst.cs | 14 +- 12 files changed, 640 insertions(+), 512 deletions(-) diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index 9df2488..da14d20 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -44,7 +44,11 @@ namespace JiShe.CollectBus.IoTDB.Model /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - + + /// + /// 设备路径 + /// + private string _devicePath; /// /// 设备路径 /// @@ -52,18 +56,16 @@ namespace JiShe.CollectBus.IoTDB.Model { get { - return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + // 如果未手动设置路径,则自动生成 + if (string.IsNullOrWhiteSpace(_devicePath)) + { + return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + } + return _devicePath; } set - { - if (string.IsNullOrWhiteSpace(value)) - { - DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; - } - else - { - DevicePath = value; - } + { + _devicePath = value; // 直接赋值给支持字段,避免递归 } } } diff --git a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs index f5249b0..bccf017 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs @@ -56,7 +56,6 @@ namespace JiShe.CollectBus.IoTDB.Options return declaredTypeName?.ToUpper() switch { "DATETIME" => v => v != null ? ((DateTime)v).ToUniversalTime().Ticks : null, - "BOOLEAN" => v => v != null && (bool)v ? 1 : 0, "STRING" => v => v != null ? $"'{v}'" : "''", _ => v => v }; diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index bf752eb..043af46 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; using Microsoft.AspNetCore.DataProtection.KeyManagement; @@ -127,80 +128,88 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { - await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => + try { - - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - - consumer!.Subscribe(topics); - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - while (!cts.IsCancellationRequested) - { - try - { - //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); - var result = consumer.Consume(cts.Token); - if (result == null || result.Message == null || result.Message.Value == null) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + _ = Task.Run(async () => + { + while (!cts.IsCancellationRequested) + { + try { - await Task.Delay(DelayTime, cts.Token); - continue; - } - if (result.IsPartitionEOF) - { -#if DEBUG - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); -#endif - await Task.Delay(DelayTime, cts.Token); - continue; - } - if (_kafkaOptionConfig.EnableFilter) - { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); + + var result = consumer.Consume(cts.Token); + if (result == null || result.Message == null || result.Message.Value == null) { - consumer.Commit(result); // 提交偏移量 - // 跳过消息 + await Task.Delay(DelayTime, cts.Token); continue; } + if (result.IsPartitionEOF) + { +#if DEBUG + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); +#endif + await Task.Delay(DelayTime, cts.Token); + continue; + } + if (_kafkaOptionConfig.EnableFilter) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + bool sucess = await messageHandler(result.Message.Key, result.Message.Value); + if (sucess) + consumer.Commit(result); // 手动提交 + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理消息时发生未知错误"); } - bool sucess = await messageHandler(result.Message.Key, result.Message.Value); - if (sucess) - consumer.Commit(result); // 手动提交 } - catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) - { - _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); - throw; // 抛出异常,以便重试 - } - catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) - { - _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); - throw; // 抛出异常,以便重试 - } - catch (OperationCanceledException) - { - //ignore - } - catch (Exception ex) - { - _logger.LogError(ex, "处理消息时发生未知错误"); - } - } - }, cts.Token); - await Task.CompletedTask; - }); + }, cts.Token); + await Task.CompletedTask; + }); + } + catch (Exception ex) + { + + throw; + } } @@ -215,76 +224,84 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { - await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => + try { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - - consumer!.Subscribe(topics); - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - int count = 0; - while (!cts.IsCancellationRequested) - { - try - { - //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息...."); - count++; - var result = consumer.Consume(cts.Token); - if (result == null || result.Message == null || result.Message.Value == null) - { - await Task.Delay(DelayTime, cts.Token); - continue; - } + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; - if (result.IsPartitionEOF) + consumer!.Subscribe(topics); + + _ = Task.Run(async () => + { + int count = 0; + while (!cts.IsCancellationRequested) + { + try { -#if DEBUG - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); -#endif - await Task.Delay(DelayTime, cts.Token); - continue; - } - if (_kafkaOptionConfig.EnableFilter) - { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息...."); + count++; + var result = consumer.Consume(cts.Token); + if (result == null || result.Message == null || result.Message.Value == null) { - consumer.Commit(result); // 提交偏移量 - // 跳过消息 + await Task.Delay(DelayTime, cts.Token); continue; } + + if (result.IsPartitionEOF) + { +#if DEBUG + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); +#endif + await Task.Delay(DelayTime, cts.Token); + continue; + } + if (_kafkaOptionConfig.EnableFilter) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + bool sucess = await messageHandler(result.Message.Value); + if (sucess) + consumer.Commit(result); // 手动提交 + //else + // consumer.StoreOffset(result); + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理消息时发生未知错误"); } - bool sucess = await messageHandler(result.Message.Value); - if (sucess) - consumer.Commit(result); // 手动提交 - //else - // consumer.StoreOffset(result); } - catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) - { - _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); - throw; // 抛出异常,以便重试 - } - catch (OperationCanceledException) - { - //ignore - } - catch (Exception ex) - { - _logger.LogError(ex, "处理消息时发生未知错误"); - } - } - }, cts.Token); - await Task.CompletedTask; - }); + }, cts.Token); + await Task.CompletedTask; + }); + } + catch (Exception ex) + { + + throw; + } } @@ -300,7 +317,15 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次超时时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { - await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); + try + { + await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); + } + catch (Exception ex) + { + + throw; + } } /// @@ -315,117 +340,125 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次超时时间 public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { - await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => + try { - - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - consumer!.Subscribe(topics); - - var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); - var startTime = DateTime.UtcNow; - while (!cts.IsCancellationRequested) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => { - try - { - // 非阻塞快速累积消息 - while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) - { - var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + var startTime = DateTime.UtcNow; - if (result != null) + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { - if (result.IsPartitionEOF) + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) { + if (result.IsPartitionEOF) + { #if DEBUG - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); #endif + await Task.Delay(DelayTime, cts.Token); + } + else if (result.Message.Value != null) + { + if (_kafkaOptionConfig.EnableFilter) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + } + } + else + { + // 无消息时短暂等待 await Task.Delay(DelayTime, cts.Token); } - else if (result.Message.Value != null) + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); + if (success) { - if (_kafkaOptionConfig.EnableFilter) + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) { - consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + offsetsByPartition[tp] = offset; } } - messages.Add((result.Message.Value, result.TopicPartitionOffset)); - } - } - else - { - // 无消息时短暂等待 - await Task.Delay(DelayTime, cts.Token); - } - } - // 处理批次 - if (messages.Count > 0) + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); - if (success) - { - var offsetsByPartition = new Dictionary(); - foreach (var msg in messages) - { - var tp = msg.Offset.TopicPartition; - var offset = msg.Offset.Offset; - if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) - { - offsetsByPartition[tp] = offset; - } - } - - var offsetsToCommit = offsetsByPartition - .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) - .ToList(); - consumer.Commit(offsetsToCommit); - } - messages.Clear(); + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 } + catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); - startTime = DateTime.UtcNow; - } - catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) - { - _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); - throw; // 抛出异常,以便重试 - } - catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) - { - _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); - throw; // 抛出异常,以便重试 - } - catch (OperationCanceledException) - { - //ignore - } - catch (Exception ex) - { - _logger.LogError(ex, "处理批量消息时发生未知错误"); - } - } - }, cts.Token); + await Task.CompletedTask; + }); + } + catch (Exception ex) + { - await Task.CompletedTask; - }); + throw; + } } @@ -441,7 +474,15 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费等待时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { - await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); + try + { + await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); + } + catch (Exception ex) + { + + throw; + } } @@ -458,116 +499,124 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费等待时间 public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { - await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => + try { - - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - - consumer!.Subscribe(topics); - - var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); - var startTime = DateTime.UtcNow; - while (!cts.IsCancellationRequested) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => { - try - { - // 非阻塞快速累积消息 - while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) - { - var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + var startTime = DateTime.UtcNow; - if (result != null) + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { - if (result.IsPartitionEOF) + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) { - //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + if (result.IsPartitionEOF) + { + //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(DelayTime, cts.Token); + } + else if (result.Message.Value != null) + { + if (_kafkaOptionConfig.EnableFilter) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + } + } + else + { + // 无消息时短暂等待 await Task.Delay(DelayTime, cts.Token); } - else if (result.Message.Value != null) + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); + if (success) { - if (_kafkaOptionConfig.EnableFilter) + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) { - consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + offsetsByPartition[tp] = offset; } } - messages.Add((result.Message.Value, result.TopicPartitionOffset)); - } - } - else - { - // 无消息时短暂等待 - await Task.Delay(DelayTime, cts.Token); - } - } - // 处理批次 - if (messages.Count > 0) + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); - if (success) - { - var offsetsByPartition = new Dictionary(); - foreach (var msg in messages) - { - var tp = msg.Offset.TopicPartition; - var offset = msg.Offset.Offset; - if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) - { - offsetsByPartition[tp] = offset; - } - } - - var offsetsToCommit = offsetsByPartition - .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) - .ToList(); - consumer.Commit(offsetsToCommit); - } - messages.Clear(); + _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 } + catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); - startTime = DateTime.UtcNow; - } - catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) - { - _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); - throw; // 抛出异常,以便重试 - } - catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) - { - _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); - throw; // 抛出异常,以便重试 - } - catch (OperationCanceledException) - { - //ignore - } - catch (Exception ex) - { - _logger.LogError(ex, "处理批量消息时发生未知错误"); - } - } - }, cts.Token); + await Task.CompletedTask; + }); + } + catch (Exception ex) + { - await Task.CompletedTask; - }); + throw; + } } @@ -578,12 +627,20 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public void Unsubscribe(string[] topics, string? groupId) where TKey : notnull where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - if (_consumerStore.TryRemove(consumerKey, out var entry)) + try { - entry.CTS.Cancel(); - (entry.Consumer as IDisposable)?.Dispose(); - entry.CTS.Dispose(); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; + if (_consumerStore.TryRemove(consumerKey, out var entry)) + { + entry.CTS.Cancel(); + (entry.Consumer as IDisposable)?.Dispose(); + entry.CTS.Dispose(); + } + } + catch (Exception ex) + { + + throw; } } diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs index 028762f..ccbe540 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs @@ -226,6 +226,11 @@ namespace JiShe.CollectBus.Kafka // 处理消费错误 logger.LogError($"kafka批量消费异常:{ex.Message}"); } + catch (Exception ex) + { + // 处理消费错误 + logger.LogError($"kafka批量消费异常:{ex.Message}"); + } return await Task.FromResult(false); }, attr.GroupId, attr.BatchSize, attr.BatchTimeout); } @@ -248,6 +253,11 @@ namespace JiShe.CollectBus.Kafka // 处理消费错误 logger.LogError($"kafka消费异常:{ex.Message}"); } + catch (Exception ex) + { + // 处理消费错误 + logger.LogError($"kafka批量消费异常:{ex.Message}"); + } return await Task.FromResult(false); }, attr.GroupId); } @@ -260,125 +270,133 @@ namespace JiShe.CollectBus.Kafka /// private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { - var parameters = method.GetParameters(); - bool isGenericTask = method.ReturnType.IsGenericType - && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); - bool existParameters = parameters.Length > 0; - object[]? executeParameters = null; - - if (existParameters) + try { - IList? list = null; - Tuple tuple = method.GetParameterTypeInfo(); - bool isEnumerable = false; - if (tuple.Item2 != null) - { - Type listType = typeof(List<>).MakeGenericType(tuple.Item2); - list = (IList)Activator.CreateInstance(listType)!; - isEnumerable = tuple.Item2.IsConvertType(); - } - else - { - isEnumerable = tuple.Item1.IsConvertType(); - } - #region 暂时 - //foreach (var msg in messages) - //{ - // if (tuple.Item2 != null) - // { - // if (isEnumerable) - // { - // var parameterType = parameters[0].ParameterType; - // var data=messages?.Serialize().Deserialize(parameterType); - // messageObj = data!=null? new[] { data }:null; - // break; - // } - // else - // { - // // 集合类型 - // var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/; - // if (data != null) - // list?.Add(data); - // } + var parameters = method.GetParameters(); + bool isGenericTask = method.ReturnType.IsGenericType + && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); + bool existParameters = parameters.Length > 0; + object[]? executeParameters = null; - // } - // else - // { - // // (dynamic)Convert.ChangeType(msg, tuple.Item1) - // using (var stream = new MemoryStream(msg)) - // { - // var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1); - // } - // var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1); - // if (data != null) - // messageObj = new[] { data }; - // } - //} - //if (tuple.Item2 != null && list != null && list.Count > 0) - //{ - // messageObj = new[] { list }; - //} - #endregion - var parameterDescriptors = method.GetParameters(); - executeParameters = new object?[parameterDescriptors.Length]; - for (var i = 0; i < parameterDescriptors.Length; i++) + if (existParameters) { - foreach (var item in messages) - { - - object? tempParameter=null; - var parameterDescriptor = parameterDescriptors[i]; - if (KafkaSerialization.IsJsonType(item)) + IList? list = null; + Tuple tuple = method.GetParameterTypeInfo(); + bool isEnumerable = false; + if (tuple.Item2 != null) + { + Type listType = typeof(List<>).MakeGenericType(tuple.Item2); + list = (IList)Activator.CreateInstance(listType)!; + isEnumerable = tuple.Item2.IsConvertType(); + } + else + { + isEnumerable = tuple.Item1.IsConvertType(); + } + #region 暂时 + //foreach (var msg in messages) + //{ + // if (tuple.Item2 != null) + // { + // if (isEnumerable) + // { + // var parameterType = parameters[0].ParameterType; + // var data=messages?.Serialize().Deserialize(parameterType); + // messageObj = data!=null? new[] { data }:null; + // break; + // } + // else + // { + // // 集合类型 + // var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/; + // if (data != null) + // list?.Add(data); + // } + + // } + // else + // { + // // (dynamic)Convert.ChangeType(msg, tuple.Item1) + // using (var stream = new MemoryStream(msg)) + // { + // var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1); + // } + // var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1); + // if (data != null) + // messageObj = new[] { data }; + // } + //} + //if (tuple.Item2 != null && list != null && list.Count > 0) + //{ + // messageObj = new[] { list }; + //} + #endregion + var parameterDescriptors = method.GetParameters(); + executeParameters = new object?[parameterDescriptors.Length]; + for (var i = 0; i < parameterDescriptors.Length; i++) + { + foreach (var item in messages) { - tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType); - } - else - { - - var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType); - if (converter.CanConvertFrom(item.GetType())) + + object? tempParameter = null; + var parameterDescriptor = parameterDescriptors[i]; + if (KafkaSerialization.IsJsonType(item)) { - tempParameter = converter.ConvertFrom(item); + tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null ? tuple.Item2 : parameterDescriptor.ParameterType); } else { - if (parameterDescriptor.ParameterType.IsInstanceOfType(item)) - tempParameter = item; - else - tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType); - } - } - if (tuple.Item2 == null) - { - executeParameters[i] = tempParameter; - } - else - { - list.Add(tempParameter); - } - - } - if(list!=null && list.Count>0) - executeParameters[i] = list; - } - } - var result = method.Invoke(subscribe, executeParameters); - if (result is Task genericTask) - { - await genericTask.ConfigureAwait(false); - return genericTask.Result.Ack; + var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType); + if (converter.CanConvertFrom(item.GetType())) + { + tempParameter = converter.ConvertFrom(item); + } + else + { + if (parameterDescriptor.ParameterType.IsInstanceOfType(item)) + tempParameter = item; + else + tempParameter = Convert.ChangeType(item, parameterDescriptor.ParameterType); + } + } + if (tuple.Item2 == null) + { + executeParameters[i] = tempParameter; + } + else + { + list.Add(tempParameter); + } + + } + if (list != null && list.Count > 0) + executeParameters[i] = list; + } + } + + var result = method.Invoke(subscribe, executeParameters); + if (result is Task genericTask) + { + await genericTask.ConfigureAwait(false); + return genericTask.Result.Ack; + } + else if (result is Task nonGenericTask) + { + await nonGenericTask.ConfigureAwait(false); + return true; + } + else if (result is ISubscribeAck ackResult) + { + return ackResult.Ack; + } + return false; } - else if (result is Task nonGenericTask) + catch (Exception ex) { - await nonGenericTask.ConfigureAwait(false); - return true; + + throw; } - else if (result is ISubscribeAck ackResult) - { - return ackResult.Ack; - } - return false; } } diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index 7f9a85d..72e9096 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -117,17 +117,25 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic, TKey key, TValue value)where TKey : notnull where TValue : class { - var typeKey = typeof(KafkaProducer); - var producer = GetProducer(typeKey); - var message = new Message + try { - Key = key, - Value = value, - Headers = new Headers{ + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); + var message = new Message + { + Key = key, + Value = value, + Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } - }; - await producer.ProduceAsync(topic, message); + }; + await producer.ProduceAsync(topic, message); + } + catch (Exception ex) + { + + throw; + } } /// @@ -139,16 +147,24 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic, TValue value) where TValue : class { - var typeKey = typeof(KafkaProducer); - var producer = GetProducer(typeKey); - var message = new Message + try { - Value = value, - Headers = new Headers{ + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); + var message = new Message + { + Value = value, + Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } - }; - await producer.ProduceAsync(topic, message); + }; + await producer.ProduceAsync(topic, message); + } + catch (Exception ex) + { + + throw; + } } /// @@ -164,26 +180,34 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic,TKey key,TValue value,int? partition=null, Action>? deliveryHandler = null)where TKey : notnull where TValue : class { - var message = new Message + try { - Key = key, - Value = value, - Headers = new Headers{ + var message = new Message + { + Key = key, + Value = value, + Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } - }; - var typeKey = typeof(KafkaProducer); - var producer = GetProducer(typeKey); - if (partition.HasValue) - { - var topicPartition = new TopicPartition(topic, new Partition(partition.Value)); - producer.Produce(topicPartition, message, deliveryHandler); + }; + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); + if (partition.HasValue) + { + var topicPartition = new TopicPartition(topic, new Partition(partition.Value)); + producer.Produce(topicPartition, message, deliveryHandler); + } + else + { + producer.Produce(topic, message, deliveryHandler); + } + await Task.CompletedTask; } - else + catch (Exception ex) { - producer.Produce(topic, message, deliveryHandler); + + throw; } - await Task.CompletedTask; } @@ -199,26 +223,34 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class { - var message = new Message + try { - Value = value, - Headers = new Headers{ + var message = new Message + { + Value = value, + Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } - }; - var typeKey = typeof(KafkaProducer); - var producer = GetProducer(typeKey); - if (partition.HasValue) - { - var topicPartition = new TopicPartition(topic,new Partition(partition.Value)); - //_logger.LogError($"push消息:{topic}-{partition.Value}"); - producer.Produce(topicPartition, message, deliveryHandler); + }; + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); + if (partition.HasValue) + { + var topicPartition = new TopicPartition(topic, new Partition(partition.Value)); + //_logger.LogError($"push消息:{topic}-{partition.Value}"); + producer.Produce(topicPartition, message, deliveryHandler); + } + else + { + producer.Produce(topic, message, deliveryHandler); + } + await Task.CompletedTask; } - else + catch (Exception ex) { - producer.Produce(topic, message, deliveryHandler); + + throw; } - await Task.CompletedTask; } public void Dispose() diff --git a/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs index 8034954..98fda49 100644 --- a/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs +++ b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs @@ -19,6 +19,7 @@ namespace JiShe.CollectBus.Kafka.Serialization { DefaultIgnoreCondition = JsonIgnoreCondition.Never, WriteIndented = false,// 设置格式化输出 + IncludeFields = true,// 允许反序列化到非公共 setter 和字段 Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 IgnoreReadOnlyFields = true, IgnoreReadOnlyProperties = true, @@ -53,7 +54,7 @@ namespace JiShe.CollectBus.Kafka.Serialization { if (data.IsEmpty) return default; - return JsonSerializer.Deserialize(data, _options)!; + return JsonSerializer.Deserialize(data, _options)!; } catch (Exception ex) { @@ -102,24 +103,37 @@ namespace JiShe.CollectBus.Kafka.Serialization } public static object? Deserialize(object value, Type valueType) { - var _jsonSerializerOptions = new JsonSerializerOptions + try { - DefaultIgnoreCondition = JsonIgnoreCondition.Never, - WriteIndented = false,// 设置格式化输出 - Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 - IgnoreReadOnlyFields = true, - IgnoreReadOnlyProperties = true, - NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串 - AllowTrailingCommas = true, // 忽略尾随逗号 - ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 - PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 - PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 - Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, - }; + var _jsonSerializerOptions = new JsonSerializerOptions + { + DefaultIgnoreCondition = JsonIgnoreCondition.Never, + WriteIndented = false,// 设置格式化输出 + IncludeFields = true,// 允许反序列化到非公共 setter 和字段 + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 + IgnoreReadOnlyFields = true, + IgnoreReadOnlyProperties = true, + NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串 + AllowTrailingCommas = true, // 忽略尾随逗号 + ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 + PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 + Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, + }; - if (value is JsonElement jsonElement) return jsonElement.Deserialize(valueType, _jsonSerializerOptions); + if (value is JsonElement jsonElement) + { + //return jsonElement.Deserialize(valueType, _jsonSerializerOptions); + return JsonSerializer.Deserialize(jsonElement, valueType, _jsonSerializerOptions); + } - throw new NotSupportedException("Type is not of type JsonElement"); + return null; + } + catch (Exception ex) + { + + throw; + } } } } diff --git a/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs index e5511f2..7dc8e43 100644 --- a/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs +++ b/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs @@ -46,7 +46,7 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData { var itemCodeArr = request.ItemCode.Split('_'); var c_data = itemCodeArr[0];//01 - var d_data = itemCodeArr[1];//91 或者 90 + var d_data = itemCodeArr[2];//91 或者 90 var dataUnit = new List() { "1F", d_data, "00" }; var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit); @@ -64,7 +64,7 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData { var itemCodeArr = request.ItemCode.Split('_'); var c_data = itemCodeArr[0];//01 - var d_data = itemCodeArr[1];//55 或者 99 + var d_data = itemCodeArr[2];//55 或者 99 var dataUnit = new List() { "A0", "17", "00", d_data }; var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit); diff --git a/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs index 7e9f3a3..5b3044a 100644 --- a/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs @@ -92,7 +92,9 @@ namespace JiShe.CollectBus.Protocol.T1882018 List dataUnit = new List(); //数据转发场景 10H_F1 if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false) - { + { + //var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_"); + var t188PacketHandlerName = $"{T1882018PacketItemCodeConst.BasicT1882018}_{request.SubProtocolRequest.ItemCode}_Send"; Telemetry1882018PacketResponse t645PacketResponse = null; diff --git a/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs index 2d20272..05def67 100644 --- a/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs +++ b/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs @@ -47,7 +47,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData { var itemCodeArr = request.ItemCode.Split('_'); var c_data = itemCodeArr[0]; - var n_data = itemCodeArr[1]; + var n_data = itemCodeArr[2]; string password = request.Password; string pwdLevel = "02"; @@ -78,7 +78,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData { var itemCodeArr = request.ItemCode.Split('_'); var c_data = itemCodeArr[0]; - var n_data = itemCodeArr[1]; + var n_data = itemCodeArr[2]; string password = request.Password; if (!string.IsNullOrWhiteSpace(password) && password.Contains("|")) diff --git a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs index a4f820b..4f06f5a 100644 --- a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs @@ -93,7 +93,9 @@ namespace JiShe.CollectBus.Protocol.T6452007 //数据转发场景 10H_F1 if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false) { - var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send"; + var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_"); + var t645PacketHandlerName = $"C{subItemCodeArr[0]}_{subItemCodeArr[1]}_Send";//C1C_01_Send + Telemetry6452007PacketResponse t645PacketResponse = null; if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 05c3007..025c250 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -122,10 +122,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading TypeName = 1, DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", TimeDensity = 15, - BrandType = "DDS1980", + BrandType = "DTS1980", MeterType = MeterTypeEnum.Ammeter, ProjectID = 1, MeteringPort = 2, + Password = "000000", }); ammeterInfos.Add(new DeviceInfo() @@ -141,10 +142,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading TypeName = 1, DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", TimeDensity = 15, - BrandType = "DDS1980", + BrandType = "DTS1980", MeterType = MeterTypeEnum.Ammeter, ProjectID = 1, MeteringPort = 2, + Password = "000000", }); //ammeterInfos.Add(new DeviceInfo() @@ -287,12 +289,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = "442400040", FocusId = 95780, ProjectID = 1, - TripType = "off", + TripType = "on", TripTime = $"{DateTime.Now:HH:mm}", MeterId = 127035, LoopType = "EachDay", EachDayWithout = "周六,周日", - TimeDensity = 15, + TimeDensity = 15, }); settingInfos.Add(new AmmeterAutoValveControlSetting() { @@ -301,7 +303,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = "442400039", FocusId = 69280, ProjectID = 1, - TripType = "off", + TripType = "on", TripTime = $"{DateTime.Now:HH:mm}", MeterId = 95594, LoopType = "EachDay", @@ -377,17 +379,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading { ammeterInfo.TripState = 0; tripStateResult = true; - subItemCode = T6452007PacketItemCodeConst.C1C01C; + subItemCode = T6452007PacketItemCodeConst.C1C011C; if (ammeterInfo.TypeName != 1) { - subItemCode = T6452007PacketItemCodeConst.C1C01B; + subItemCode = T6452007PacketItemCodeConst.C1C011B; } } else if (settingInfo.TripType.Equals("off")) { ammeterInfo.TripState = 1; tripStateResult = false; - subItemCode = T6452007PacketItemCodeConst.C1C01A; + subItemCode = T6452007PacketItemCodeConst.C1C011A; } else { diff --git a/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs b/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs index 92da5ba..6bdb750 100644 --- a/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs @@ -16,37 +16,37 @@ namespace JiShe.CollectBus.Common.Consts /// /// 跳闸 /// - public const string C1C01A = "1C_1A"; + public const string C1C011A = "1C_01_1A"; /// /// 单相合闸 /// - public const string C1C01B = "1C_1B"; + public const string C1C011B = "1C_01_1B"; /// /// 三相合闸 /// - public const string C1C01C = "1C_1C"; + public const string C1C011C = "1C_01_1C"; /// /// 触发报警 /// - public const string C1C02A = "1C_2A"; + public const string C1C021A = "1C_01_2A"; /// /// 报警解除 /// - public const string C1C02B = "1C_2B"; + public const string C1C012B = "1C_01_2B"; /// /// 保电开始 /// - public const string C1C03A = "1C_3A"; + public const string C1C033A = "1C_03_3A"; /// /// 保电结束 /// - public const string C1C03B = "1C_3B"; + public const string C1C033B = "1C_03_3B"; #endregion #region 广播校时 From e02a0953e9b4e2646857eb0e0f61416768a2d21f Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 14 May 2025 23:28:44 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=AE=8C=E5=96=843761=E4=B8=8B=E8=A1=8C?= =?UTF-8?q?=E9=80=8F=E6=98=8E=E8=BD=AC=E5=8F=91=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SendData/Telemetry3761PacketBuilder.cs | 11 ++- .../SendData/Telemetry3761PacketRequest.cs | 11 ++- .../SendData/Telemetry6452007PacketRequest.cs | 21 ++--- .../T6452007ProtocolPlugin.cs | 1 + .../Models/SubProtocolBuildRequest.cs | 10 +++ ...nergySystemScheduledMeterReadingService.cs | 80 +++++++++++-------- .../BuildSendDatas/Build3761SendData.cs | 4 +- .../Consts/MeteringPortConst.cs | 24 ++++++ 8 files changed, 107 insertions(+), 55 deletions(-) create mode 100644 shared/JiShe.CollectBus.Common/Consts/MeteringPortConst.cs diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs index f8bfa78..18dcbb3 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs @@ -1,4 +1,5 @@ -using System.Reflection; +using System.Data; +using System.Reflection; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; @@ -259,6 +260,12 @@ namespace JiShe.CollectBus.Protocol.T37612012.SendData #region AFN10H 数据转发 public static Telemetry3761PacketResponse AFN10_Fn_Send(Telemetry3761PacketRequest request) { + + var baudRateValue = Build3761SendData.GetBaudreate($"{request.SubRequest.Baudrate}"); + + var dataUnit = Build3761SendData.BuildTransparentForwardingSendDataUnit(request.SubRequest.MeteringPort, baudRateValue, request.DataUnit); + dataUnit.AddRange(Build3761SendData.GetPW()); + var reqParameter = new ReqParameter2() { AFN = AFN.数据转发, @@ -275,7 +282,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.SendData Pn = request.Pn, Fn = request.Fn }; - var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit); + var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, dataUnit); return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketRequest.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketRequest.cs index f02efc9..a5d8593 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketRequest.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketRequest.cs @@ -1,4 +1,6 @@ -namespace JiShe.CollectBus.Protocol.T37612012.SendData +using JiShe.CollectBus.Protocol.Models; + +namespace JiShe.CollectBus.Protocol.T37612012.SendData { /// /// 构建3761报文参数 @@ -20,8 +22,13 @@ /// public int Pn { get; set; } + /// + /// 子协议请求 + /// + public SubProtocolBuildRequest SubRequest { get; set; } + /// - /// 透明转发单元 + /// 透明转发数据单元 /// public List DataUnit { get; set; } } diff --git a/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketRequest.cs b/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketRequest.cs index c367206..b819b91 100644 --- a/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketRequest.cs +++ b/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketRequest.cs @@ -1,23 +1,12 @@ -namespace JiShe.CollectBus.Protocol.T6452007.SendData +using JiShe.CollectBus.Protocol.Models; + +namespace JiShe.CollectBus.Protocol.T6452007.SendData { /// /// 构建645报文参数 /// - public class Telemetry6452007PacketRequest + public class Telemetry6452007PacketRequest: SubProtocolBuildRequest { - /// - /// 表地址 - /// - public required string MeterAddress { get; set; } - - /// - /// 密码 - /// - public required string Password { get; set; } - - /// - /// 操作码 - /// - public required string ItemCode { get; set; } + } } diff --git a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs index 4f06f5a..f145638 100644 --- a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs @@ -129,6 +129,7 @@ namespace JiShe.CollectBus.Protocol.T6452007 FocusAddress = request.FocusAddress, Fn = fn, Pn = request.Pn, + SubRequest = request.SubProtocolRequest, DataUnit = dataUnit, }); } diff --git a/protocols/JiShe.CollectBus.Protocol/Models/SubProtocolBuildRequest.cs b/protocols/JiShe.CollectBus.Protocol/Models/SubProtocolBuildRequest.cs index 09a5c55..15526e2 100644 --- a/protocols/JiShe.CollectBus.Protocol/Models/SubProtocolBuildRequest.cs +++ b/protocols/JiShe.CollectBus.Protocol/Models/SubProtocolBuildRequest.cs @@ -10,6 +10,16 @@ /// public required string MeterAddress { get; set; } + /// + /// 波特率 default(2400) + /// + public int Baudrate { get; set; } + + /// + /// 计量端口 + /// + public int MeteringPort { get; set; } + /// /// 密码 /// diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 025c250..c51a527 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -125,7 +125,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading BrandType = "DTS1980", MeterType = MeterTypeEnum.Ammeter, ProjectID = 1, - MeteringPort = 2, + MeteringPort = MeteringPortConst.MeteringPortTwo, Password = "000000", }); @@ -145,7 +145,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading BrandType = "DTS1980", MeterType = MeterTypeEnum.Ammeter, ProjectID = 1, - MeteringPort = 2, + MeteringPort = MeteringPortConst.MeteringPortTwo, Password = "000000", }); @@ -279,37 +279,49 @@ namespace JiShe.CollectBus.ScheduledMeterReading string currentTimeStr = $"{currentTime:HH:mm}"; try - { + { #if DEBUG - var settingInfos = new List(); - settingInfos.Add(new AmmeterAutoValveControlSetting() + //电表自动阀控缓存 + string redisCacheDeviceSettingInfoHashKey = $"redisCacheDeviceSettingInfoHashKey_{SystemType}_{ServerTagName}"; + + var settingInfos = FreeRedisProvider.Instance.Get< List> (redisCacheDeviceSettingInfoHashKey); + if (settingInfos == null || settingInfos.Count <= 0) { - MeterType = MeterTypeEnum.Ammeter, - AmmerterAddress = "442405000040", - FocusAddress = "442400040", - FocusId = 95780, - ProjectID = 1, - TripType = "on", - TripTime = $"{DateTime.Now:HH:mm}", - MeterId = 127035, - LoopType = "EachDay", - EachDayWithout = "周六,周日", - TimeDensity = 15, - }); - settingInfos.Add(new AmmeterAutoValveControlSetting() - { - MeterType = MeterTypeEnum.Ammeter, - AmmerterAddress = "442405000039", - FocusAddress = "442400039", - FocusId = 69280, - ProjectID = 1, - TripType = "on", - TripTime = $"{DateTime.Now:HH:mm}", - MeterId = 95594, - LoopType = "EachDay", - EachDayWithout = "周六,周日", - TimeDensity = 15, - }); + settingInfos = new List(); + settingInfos.Add(new AmmeterAutoValveControlSetting() + { + MeterType = MeterTypeEnum.Ammeter, + AmmerterAddress = "442405000040", + FocusAddress = "442400040", + FocusId = 95780, + ProjectID = 1, + TripType = "on", + TripTime = $"{DateTime.Now:HH:mm}", + MeterId = 127035, + LoopType = "EachDay", + EachDayWithout = "周六,周日", + TimeDensity = 15, + }); + settingInfos.Add(new AmmeterAutoValveControlSetting() + { + MeterType = MeterTypeEnum.Ammeter, + AmmerterAddress = "442405000039", + FocusAddress = "442400039", + FocusId = 69280, + ProjectID = 1, + TripType = "on", + TripTime = $"{DateTime.Now:HH:mm}", + MeterId = 95594, + LoopType = "EachDay", + EachDayWithout = "周六,周日", + TimeDensity = 15, + }); + + + FreeRedisProvider.Instance.Set(redisCacheDeviceSettingInfoHashKey, settingInfos); + } + + #else //获取电表阀控配置 var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr); @@ -375,7 +387,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading bool tripStateResult = false; string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H; string subItemCode = string.Empty; - if (settingInfo.TripType.Equals("on")) + if (settingInfo.TripType.Equals("on"))//当前电表断闸,需要合闸 { ammeterInfo.TripState = 0; tripStateResult = true; @@ -385,7 +397,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode = T6452007PacketItemCodeConst.C1C011B; } } - else if (settingInfo.TripType.Equals("off")) + else if (settingInfo.TripType.Equals("off"))//当前电表合闸,需要断闸 { ammeterInfo.TripState = 1; tripStateResult = false; @@ -416,6 +428,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading MeterAddress = ammeterInfo.MeterAddress, Password = ammeterInfo.Password, ItemCode = subItemCode, + Baudrate = ammeterInfo.Baudrate, + MeteringPort = ammeterInfo.MeteringPort } }); diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs index 76f6b88..2d815da 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs @@ -1362,7 +1362,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// 等待报文超时时间/s /// 等待字节超时时间/ms /// - private static List BuildTransparentForwardingSendDataUnit(int port, int baudRate, List datas, StopBit stopBit = StopBit.Stop1, Parity parity = Parity.Even, DataBit dataBit = DataBit.D8, + public static List BuildTransparentForwardingSendDataUnit(int port, int baudRate, List datas, StopBit stopBit = StopBit.Stop1, Parity parity = Parity.Even, DataBit dataBit = DataBit.D8, int waitContentTimeout = 100, int waitByteTimeout = 100) { var dataUnit = new List(); @@ -1591,7 +1591,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas //AUX=消息认证码字段(PW,16个字节) - private static List GetPW() + public static List GetPW() { var str = "00"; var pWList = Enumerable.Repeat(str, pWLen).ToList(); diff --git a/shared/JiShe.CollectBus.Common/Consts/MeteringPortConst.cs b/shared/JiShe.CollectBus.Common/Consts/MeteringPortConst.cs new file mode 100644 index 0000000..c65b027 --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Consts/MeteringPortConst.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Consts +{ + /// + /// 计量端口常量 + /// + public class MeteringPortConst + { + /// + /// 计量端口1 + /// + public const int MeteringPortOne = 1; + + /// + /// 计量端口2 + /// + public const int MeteringPortTwo = 2; + } +}