diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs
index 9df2488..da14d20 100644
--- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs
@@ -44,7 +44,11 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
///
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
-
+
+ ///
+ /// 设备路径
+ ///
+ private string _devicePath;
///
/// 设备路径
///
@@ -52,18 +56,16 @@ namespace JiShe.CollectBus.IoTDB.Model
{
get
{
- return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
+ // 如果未手动设置路径,则自动生成
+ if (string.IsNullOrWhiteSpace(_devicePath))
+ {
+ return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
+ }
+ return _devicePath;
}
set
- {
- if (string.IsNullOrWhiteSpace(value))
- {
- DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
- }
- else
- {
- DevicePath = value;
- }
+ {
+ _devicePath = value; // 直接赋值给支持字段,避免递归
}
}
}
diff --git a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs
index f5249b0..bccf017 100644
--- a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs
@@ -56,7 +56,6 @@ namespace JiShe.CollectBus.IoTDB.Options
return declaredTypeName?.ToUpper() switch
{
"DATETIME" => v => v != null ? ((DateTime)v).ToUniversalTime().Ticks : null,
- "BOOLEAN" => v => v != null && (bool)v ? 1 : 0,
"STRING" => v => v != null ? $"'{v}'" : "''",
_ => v => v
};
diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
index bf752eb..043af46 100644
--- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
@@ -1,6 +1,7 @@
using Confluent.Kafka;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
+using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.DataProtection.KeyManagement;
@@ -127,80 +128,88 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
- await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
+ try
{
-
- var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
- var cts = new CancellationTokenSource();
-
- var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
- (
- CreateConsumer(groupId),
- cts
- )).Consumer as IConsumer;
-
- consumer!.Subscribe(topics);
-
- _ = Task.Run(async () =>
+ await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
- while (!cts.IsCancellationRequested)
- {
- try
- {
- //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
- var result = consumer.Consume(cts.Token);
- if (result == null || result.Message == null || result.Message.Value == null)
+ var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
+ var cts = new CancellationTokenSource();
+
+ var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ (
+ CreateConsumer(groupId),
+ cts
+ )).Consumer as IConsumer;
+
+ consumer!.Subscribe(topics);
+
+ _ = Task.Run(async () =>
+ {
+ while (!cts.IsCancellationRequested)
+ {
+ try
{
- await Task.Delay(DelayTime, cts.Token);
- continue;
- }
- if (result.IsPartitionEOF)
- {
-#if DEBUG
- _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
-#endif
- await Task.Delay(DelayTime, cts.Token);
- continue;
- }
- if (_kafkaOptionConfig.EnableFilter)
- {
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
- // 检查 Header 是否符合条件
- if (!headersFilter.Match(result.Message.Headers))
+ //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
+
+ var result = consumer.Consume(cts.Token);
+ if (result == null || result.Message == null || result.Message.Value == null)
{
- consumer.Commit(result); // 提交偏移量
- // 跳过消息
+ await Task.Delay(DelayTime, cts.Token);
continue;
}
+ if (result.IsPartitionEOF)
+ {
+#if DEBUG
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+#endif
+ await Task.Delay(DelayTime, cts.Token);
+ continue;
+ }
+ if (_kafkaOptionConfig.EnableFilter)
+ {
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ consumer.Commit(result); // 提交偏移量
+ // 跳过消息
+ continue;
+ }
+ }
+ bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
+ if (sucess)
+ consumer.Commit(result); // 手动提交
+ }
+ catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
+ {
+ _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
+ throw; // 抛出异常,以便重试
+ }
+ catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
+ {
+ _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
+ throw; // 抛出异常,以便重试
+ }
+ catch (OperationCanceledException)
+ {
+ //ignore
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "处理消息时发生未知错误");
}
- bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
- if (sucess)
- consumer.Commit(result); // 手动提交
}
- catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
- {
- _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
- throw; // 抛出异常,以便重试
- }
- catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
- {
- _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
- throw; // 抛出异常,以便重试
- }
- catch (OperationCanceledException)
- {
- //ignore
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "处理消息时发生未知错误");
- }
- }
- }, cts.Token);
- await Task.CompletedTask;
- });
+ }, cts.Token);
+ await Task.CompletedTask;
+ });
+ }
+ catch (Exception ex)
+ {
+
+ throw;
+ }
}
@@ -215,76 +224,84 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class
{
- await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
+ try
{
- var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
- var cts = new CancellationTokenSource();
- var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
- (
- CreateConsumer(groupId),
- cts
- )).Consumer as IConsumer;
-
- consumer!.Subscribe(topics);
-
- _ = Task.Run(async () =>
+ await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
- int count = 0;
- while (!cts.IsCancellationRequested)
- {
- try
- {
- //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
- count++;
- var result = consumer.Consume(cts.Token);
- if (result == null || result.Message == null || result.Message.Value == null)
- {
- await Task.Delay(DelayTime, cts.Token);
- continue;
- }
+ var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
+ var cts = new CancellationTokenSource();
+ var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ (
+ CreateConsumer(groupId),
+ cts
+ )).Consumer as IConsumer;
- if (result.IsPartitionEOF)
+ consumer!.Subscribe(topics);
+
+ _ = Task.Run(async () =>
+ {
+ int count = 0;
+ while (!cts.IsCancellationRequested)
+ {
+ try
{
-#if DEBUG
- _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
-#endif
- await Task.Delay(DelayTime, cts.Token);
- continue;
- }
- if (_kafkaOptionConfig.EnableFilter)
- {
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
- // 检查 Header 是否符合条件
- if (!headersFilter.Match(result.Message.Headers))
+ //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
+ count++;
+ var result = consumer.Consume(cts.Token);
+ if (result == null || result.Message == null || result.Message.Value == null)
{
- consumer.Commit(result); // 提交偏移量
- // 跳过消息
+ await Task.Delay(DelayTime, cts.Token);
continue;
}
+
+ if (result.IsPartitionEOF)
+ {
+#if DEBUG
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+#endif
+ await Task.Delay(DelayTime, cts.Token);
+ continue;
+ }
+ if (_kafkaOptionConfig.EnableFilter)
+ {
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ consumer.Commit(result); // 提交偏移量
+ // 跳过消息
+ continue;
+ }
+ }
+ bool sucess = await messageHandler(result.Message.Value);
+ if (sucess)
+ consumer.Commit(result); // 手动提交
+ //else
+ // consumer.StoreOffset(result);
+ }
+ catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
+ {
+ _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
+ throw; // 抛出异常,以便重试
+ }
+ catch (OperationCanceledException)
+ {
+ //ignore
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "处理消息时发生未知错误");
}
- bool sucess = await messageHandler(result.Message.Value);
- if (sucess)
- consumer.Commit(result); // 手动提交
- //else
- // consumer.StoreOffset(result);
}
- catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
- {
- _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
- throw; // 抛出异常,以便重试
- }
- catch (OperationCanceledException)
- {
- //ignore
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "处理消息时发生未知错误");
- }
- }
- }, cts.Token);
- await Task.CompletedTask;
- });
+ }, cts.Token);
+ await Task.CompletedTask;
+ });
+ }
+ catch (Exception ex)
+ {
+
+ throw;
+ }
}
@@ -300,7 +317,15 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// 批次超时时间
public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
- await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
+ try
+ {
+ await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
+ }
+ catch (Exception ex)
+ {
+
+ throw;
+ }
}
///
@@ -315,117 +340,125 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// 批次超时时间
public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
- await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
+ try
{
-
- var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
- var cts = new CancellationTokenSource();
-
- var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
- (
- CreateConsumer(groupId),
- cts
- )).Consumer as IConsumer;
- consumer!.Subscribe(topics);
-
- var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
-
- _ = Task.Run(async () =>
+ await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
- var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
- var startTime = DateTime.UtcNow;
- while (!cts.IsCancellationRequested)
+ var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
+ var cts = new CancellationTokenSource();
+
+ var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ (
+ CreateConsumer(groupId),
+ cts
+ )).Consumer as IConsumer;
+ consumer!.Subscribe(topics);
+
+ var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
+
+ _ = Task.Run(async () =>
{
- try
- {
- // 非阻塞快速累积消息
- while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
- {
- var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
+ var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
+ var startTime = DateTime.UtcNow;
- if (result != null)
+ while (!cts.IsCancellationRequested)
+ {
+ try
+ {
+ // 非阻塞快速累积消息
+ while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
- if (result.IsPartitionEOF)
+ var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
+
+ if (result != null)
{
+ if (result.IsPartitionEOF)
+ {
#if DEBUG
- _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
+ await Task.Delay(DelayTime, cts.Token);
+ }
+ else if (result.Message.Value != null)
+ {
+ if (_kafkaOptionConfig.EnableFilter)
+ {
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ consumer.Commit(result); // 提交偏移量
+ // 跳过消息
+ continue;
+ }
+ }
+ messages.Add((result.Message.Value, result.TopicPartitionOffset));
+ }
+ }
+ else
+ {
+ // 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
- else if (result.Message.Value != null)
+ }
+
+ // 处理批次
+ if (messages.Count > 0)
+ {
+ bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
+ if (success)
{
- if (_kafkaOptionConfig.EnableFilter)
+ var offsetsByPartition = new Dictionary();
+ foreach (var msg in messages)
{
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
- // 检查 Header 是否符合条件
- if (!headersFilter.Match(result.Message.Headers))
+ var tp = msg.Offset.TopicPartition;
+ var offset = msg.Offset.Offset;
+ if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
- consumer.Commit(result); // 提交偏移量
- // 跳过消息
- continue;
+ offsetsByPartition[tp] = offset;
}
}
- messages.Add((result.Message.Value, result.TopicPartitionOffset));
- }
- }
- else
- {
- // 无消息时短暂等待
- await Task.Delay(DelayTime, cts.Token);
- }
- }
- // 处理批次
- if (messages.Count > 0)
+ var offsetsToCommit = offsetsByPartition
+ .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
+ .ToList();
+ consumer.Commit(offsetsToCommit);
+ }
+ messages.Clear();
+ }
+
+ startTime = DateTime.UtcNow;
+ }
+ catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
- bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
- if (success)
- {
- var offsetsByPartition = new Dictionary();
- foreach (var msg in messages)
- {
- var tp = msg.Offset.TopicPartition;
- var offset = msg.Offset.Offset;
- if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
- {
- offsetsByPartition[tp] = offset;
- }
- }
-
- var offsetsToCommit = offsetsByPartition
- .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
- .ToList();
- consumer.Commit(offsetsToCommit);
- }
- messages.Clear();
+ _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
+ throw; // 抛出异常,以便重试
}
+ catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
+ {
+ _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
+ throw; // 抛出异常,以便重试
+ }
+ catch (OperationCanceledException)
+ {
+ //ignore
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "处理批量消息时发生未知错误");
+ }
+ }
+ }, cts.Token);
- startTime = DateTime.UtcNow;
- }
- catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
- {
- _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
- throw; // 抛出异常,以便重试
- }
- catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
- {
- _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
- throw; // 抛出异常,以便重试
- }
- catch (OperationCanceledException)
- {
- //ignore
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "处理批量消息时发生未知错误");
- }
- }
- }, cts.Token);
+ await Task.CompletedTask;
+ });
+ }
+ catch (Exception ex)
+ {
- await Task.CompletedTask;
- });
+ throw;
+ }
}
@@ -441,7 +474,15 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// 消费等待时间
public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
- await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
+ try
+ {
+ await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
+ }
+ catch (Exception ex)
+ {
+
+ throw;
+ }
}
@@ -458,116 +499,124 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// 消费等待时间
public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
- await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
+ try
{
-
- var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
- var cts = new CancellationTokenSource();
-
- var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
- (
- CreateConsumer(groupId),
- cts
- )).Consumer as IConsumer;
-
- consumer!.Subscribe(topics);
-
- var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
-
- _ = Task.Run(async () =>
+ await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
- var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
- var startTime = DateTime.UtcNow;
- while (!cts.IsCancellationRequested)
+ var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
+ var cts = new CancellationTokenSource();
+
+ var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ (
+ CreateConsumer(groupId),
+ cts
+ )).Consumer as IConsumer;
+
+ consumer!.Subscribe(topics);
+
+ var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
+
+ _ = Task.Run(async () =>
{
- try
- {
- // 非阻塞快速累积消息
- while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
- {
- var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
+ var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
+ var startTime = DateTime.UtcNow;
- if (result != null)
+ while (!cts.IsCancellationRequested)
+ {
+ try
+ {
+ // 非阻塞快速累积消息
+ while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
- if (result.IsPartitionEOF)
+ var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
+
+ if (result != null)
{
- //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+ if (result.IsPartitionEOF)
+ {
+ //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+ await Task.Delay(DelayTime, cts.Token);
+ }
+ else if (result.Message.Value != null)
+ {
+ if (_kafkaOptionConfig.EnableFilter)
+ {
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ consumer.Commit(result); // 提交偏移量
+ // 跳过消息
+ continue;
+ }
+ }
+ messages.Add((result.Message.Value, result.TopicPartitionOffset));
+ }
+ }
+ else
+ {
+ // 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
- else if (result.Message.Value != null)
+ }
+
+ // 处理批次
+ if (messages.Count > 0)
+ {
+ bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
+ if (success)
{
- if (_kafkaOptionConfig.EnableFilter)
+ var offsetsByPartition = new Dictionary();
+ foreach (var msg in messages)
{
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
- // 检查 Header 是否符合条件
- if (!headersFilter.Match(result.Message.Headers))
+ var tp = msg.Offset.TopicPartition;
+ var offset = msg.Offset.Offset;
+ if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
- consumer.Commit(result); // 提交偏移量
- // 跳过消息
- continue;
+ offsetsByPartition[tp] = offset;
}
}
- messages.Add((result.Message.Value, result.TopicPartitionOffset));
- }
- }
- else
- {
- // 无消息时短暂等待
- await Task.Delay(DelayTime, cts.Token);
- }
- }
- // 处理批次
- if (messages.Count > 0)
+ var offsetsToCommit = offsetsByPartition
+ .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
+ .ToList();
+ consumer.Commit(offsetsToCommit);
+ }
+ messages.Clear();
+ }
+
+ startTime = DateTime.UtcNow;
+ }
+ catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
- bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
- if (success)
- {
- var offsetsByPartition = new Dictionary();
- foreach (var msg in messages)
- {
- var tp = msg.Offset.TopicPartition;
- var offset = msg.Offset.Offset;
- if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
- {
- offsetsByPartition[tp] = offset;
- }
- }
-
- var offsetsToCommit = offsetsByPartition
- .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
- .ToList();
- consumer.Commit(offsetsToCommit);
- }
- messages.Clear();
+ _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
+ throw; // 抛出异常,以便重试
}
+ catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
+ {
+ _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
+ throw; // 抛出异常,以便重试
+ }
+ catch (OperationCanceledException)
+ {
+ //ignore
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "处理批量消息时发生未知错误");
+ }
+ }
+ }, cts.Token);
- startTime = DateTime.UtcNow;
- }
- catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
- {
- _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
- throw; // 抛出异常,以便重试
- }
- catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
- {
- _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
- throw; // 抛出异常,以便重试
- }
- catch (OperationCanceledException)
- {
- //ignore
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "处理批量消息时发生未知错误");
- }
- }
- }, cts.Token);
+ await Task.CompletedTask;
+ });
+ }
+ catch (Exception ex)
+ {
- await Task.CompletedTask;
- });
+ throw;
+ }
}
@@ -578,12 +627,20 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
public void Unsubscribe(string[] topics, string? groupId) where TKey : notnull where TValue : class
{
- var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
- if (_consumerStore.TryRemove(consumerKey, out var entry))
+ try
{
- entry.CTS.Cancel();
- (entry.Consumer as IDisposable)?.Dispose();
- entry.CTS.Dispose();
+ var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
+ if (_consumerStore.TryRemove(consumerKey, out var entry))
+ {
+ entry.CTS.Cancel();
+ (entry.Consumer as IDisposable)?.Dispose();
+ entry.CTS.Dispose();
+ }
+ }
+ catch (Exception ex)
+ {
+
+ throw;
}
}
diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
index 028762f..ccbe540 100644
--- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
+++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
@@ -226,6 +226,11 @@ namespace JiShe.CollectBus.Kafka
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
+ catch (Exception ex)
+ {
+ // 处理消费错误
+ logger.LogError($"kafka批量消费异常:{ex.Message}");
+ }
return await Task.FromResult(false);
}, attr.GroupId, attr.BatchSize, attr.BatchTimeout);
}
@@ -248,6 +253,11 @@ namespace JiShe.CollectBus.Kafka
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
+ catch (Exception ex)
+ {
+ // 处理消费错误
+ logger.LogError($"kafka批量消费异常:{ex.Message}");
+ }
return await Task.FromResult(false);
}, attr.GroupId);
}
@@ -260,125 +270,133 @@ namespace JiShe.CollectBus.Kafka
///
private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe)
{
- var parameters = method.GetParameters();
- bool isGenericTask = method.ReturnType.IsGenericType
- && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
- bool existParameters = parameters.Length > 0;
- object[]? executeParameters = null;
-
- if (existParameters)
+ try
{
- IList? list = null;
- Tuple tuple = method.GetParameterTypeInfo();
- bool isEnumerable = false;
- if (tuple.Item2 != null)
- {
- Type listType = typeof(List<>).MakeGenericType(tuple.Item2);
- list = (IList)Activator.CreateInstance(listType)!;
- isEnumerable = tuple.Item2.IsConvertType();
- }
- else
- {
- isEnumerable = tuple.Item1.IsConvertType();
- }
- #region 暂时
- //foreach (var msg in messages)
- //{
- // if (tuple.Item2 != null)
- // {
- // if (isEnumerable)
- // {
- // var parameterType = parameters[0].ParameterType;
- // var data=messages?.Serialize().Deserialize(parameterType);
- // messageObj = data!=null? new[] { data }:null;
- // break;
- // }
- // else
- // {
- // // 集合类型
- // var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/;
- // if (data != null)
- // list?.Add(data);
- // }
+ var parameters = method.GetParameters();
+ bool isGenericTask = method.ReturnType.IsGenericType
+ && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
+ bool existParameters = parameters.Length > 0;
+ object[]? executeParameters = null;
- // }
- // else
- // {
- // // (dynamic)Convert.ChangeType(msg, tuple.Item1)
- // using (var stream = new MemoryStream(msg))
- // {
- // var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1);
- // }
- // var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1);
- // if (data != null)
- // messageObj = new[] { data };
- // }
- //}
- //if (tuple.Item2 != null && list != null && list.Count > 0)
- //{
- // messageObj = new[] { list };
- //}
- #endregion
- var parameterDescriptors = method.GetParameters();
- executeParameters = new object?[parameterDescriptors.Length];
- for (var i = 0; i < parameterDescriptors.Length; i++)
+ if (existParameters)
{
- foreach (var item in messages)
- {
-
- object? tempParameter=null;
- var parameterDescriptor = parameterDescriptors[i];
- if (KafkaSerialization.IsJsonType(item))
+ IList? list = null;
+ Tuple tuple = method.GetParameterTypeInfo();
+ bool isEnumerable = false;
+ if (tuple.Item2 != null)
+ {
+ Type listType = typeof(List<>).MakeGenericType(tuple.Item2);
+ list = (IList)Activator.CreateInstance(listType)!;
+ isEnumerable = tuple.Item2.IsConvertType();
+ }
+ else
+ {
+ isEnumerable = tuple.Item1.IsConvertType();
+ }
+ #region 暂时
+ //foreach (var msg in messages)
+ //{
+ // if (tuple.Item2 != null)
+ // {
+ // if (isEnumerable)
+ // {
+ // var parameterType = parameters[0].ParameterType;
+ // var data=messages?.Serialize().Deserialize(parameterType);
+ // messageObj = data!=null? new[] { data }:null;
+ // break;
+ // }
+ // else
+ // {
+ // // 集合类型
+ // var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/;
+ // if (data != null)
+ // list?.Add(data);
+ // }
+
+ // }
+ // else
+ // {
+ // // (dynamic)Convert.ChangeType(msg, tuple.Item1)
+ // using (var stream = new MemoryStream(msg))
+ // {
+ // var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1);
+ // }
+ // var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1);
+ // if (data != null)
+ // messageObj = new[] { data };
+ // }
+ //}
+ //if (tuple.Item2 != null && list != null && list.Count > 0)
+ //{
+ // messageObj = new[] { list };
+ //}
+ #endregion
+ var parameterDescriptors = method.GetParameters();
+ executeParameters = new object?[parameterDescriptors.Length];
+ for (var i = 0; i < parameterDescriptors.Length; i++)
+ {
+ foreach (var item in messages)
{
- tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType);
- }
- else
- {
-
- var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
- if (converter.CanConvertFrom(item.GetType()))
+
+ object? tempParameter = null;
+ var parameterDescriptor = parameterDescriptors[i];
+ if (KafkaSerialization.IsJsonType(item))
{
- tempParameter = converter.ConvertFrom(item);
+ tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null ? tuple.Item2 : parameterDescriptor.ParameterType);
}
else
{
- if (parameterDescriptor.ParameterType.IsInstanceOfType(item))
- tempParameter = item;
- else
- tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType);
- }
- }
- if (tuple.Item2 == null)
- {
- executeParameters[i] = tempParameter;
- }
- else
- {
- list.Add(tempParameter);
- }
-
- }
- if(list!=null && list.Count>0)
- executeParameters[i] = list;
- }
- }
- var result = method.Invoke(subscribe, executeParameters);
- if (result is Task genericTask)
- {
- await genericTask.ConfigureAwait(false);
- return genericTask.Result.Ack;
+ var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
+ if (converter.CanConvertFrom(item.GetType()))
+ {
+ tempParameter = converter.ConvertFrom(item);
+ }
+ else
+ {
+ if (parameterDescriptor.ParameterType.IsInstanceOfType(item))
+ tempParameter = item;
+ else
+ tempParameter = Convert.ChangeType(item, parameterDescriptor.ParameterType);
+ }
+ }
+ if (tuple.Item2 == null)
+ {
+ executeParameters[i] = tempParameter;
+ }
+ else
+ {
+ list.Add(tempParameter);
+ }
+
+ }
+ if (list != null && list.Count > 0)
+ executeParameters[i] = list;
+ }
+ }
+
+ var result = method.Invoke(subscribe, executeParameters);
+ if (result is Task genericTask)
+ {
+ await genericTask.ConfigureAwait(false);
+ return genericTask.Result.Ack;
+ }
+ else if (result is Task nonGenericTask)
+ {
+ await nonGenericTask.ConfigureAwait(false);
+ return true;
+ }
+ else if (result is ISubscribeAck ackResult)
+ {
+ return ackResult.Ack;
+ }
+ return false;
}
- else if (result is Task nonGenericTask)
+ catch (Exception ex)
{
- await nonGenericTask.ConfigureAwait(false);
- return true;
+
+ throw;
}
- else if (result is ISubscribeAck ackResult)
- {
- return ackResult.Ack;
- }
- return false;
}
}
diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
index 7f9a85d..72e9096 100644
--- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
@@ -117,17 +117,25 @@ namespace JiShe.CollectBus.Kafka.Producer
///
public async Task ProduceAsync(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
{
- var typeKey = typeof(KafkaProducer);
- var producer = GetProducer(typeKey);
- var message = new Message
+ try
{
- Key = key,
- Value = value,
- Headers = new Headers{
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
+ var message = new Message
+ {
+ Key = key,
+ Value = value,
+ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
- };
- await producer.ProduceAsync(topic, message);
+ };
+ await producer.ProduceAsync(topic, message);
+ }
+ catch (Exception ex)
+ {
+
+ throw;
+ }
}
///
@@ -139,16 +147,24 @@ namespace JiShe.CollectBus.Kafka.Producer
///
public async Task ProduceAsync(string topic, TValue value) where TValue : class
{
- var typeKey = typeof(KafkaProducer);
- var producer = GetProducer(typeKey);
- var message = new Message
+ try
{
- Value = value,
- Headers = new Headers{
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
+ var message = new Message
+ {
+ Value = value,
+ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
- };
- await producer.ProduceAsync(topic, message);
+ };
+ await producer.ProduceAsync(topic, message);
+ }
+ catch (Exception ex)
+ {
+
+ throw;
+ }
}
///
@@ -164,26 +180,34 @@ namespace JiShe.CollectBus.Kafka.Producer
///
public async Task ProduceAsync(string topic,TKey key,TValue value,int? partition=null, Action>? deliveryHandler = null)where TKey : notnull where TValue : class
{
- var message = new Message
+ try
{
- Key = key,
- Value = value,
- Headers = new Headers{
+ var message = new Message
+ {
+ Key = key,
+ Value = value,
+ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
- };
- var typeKey = typeof(KafkaProducer);
- var producer = GetProducer(typeKey);
- if (partition.HasValue)
- {
- var topicPartition = new TopicPartition(topic, new Partition(partition.Value));
- producer.Produce(topicPartition, message, deliveryHandler);
+ };
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
+ if (partition.HasValue)
+ {
+ var topicPartition = new TopicPartition(topic, new Partition(partition.Value));
+ producer.Produce(topicPartition, message, deliveryHandler);
+ }
+ else
+ {
+ producer.Produce(topic, message, deliveryHandler);
+ }
+ await Task.CompletedTask;
}
- else
+ catch (Exception ex)
{
- producer.Produce(topic, message, deliveryHandler);
+
+ throw;
}
- await Task.CompletedTask;
}
@@ -199,26 +223,34 @@ namespace JiShe.CollectBus.Kafka.Producer
///
public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class
{
- var message = new Message
+ try
{
- Value = value,
- Headers = new Headers{
+ var message = new Message
+ {
+ Value = value,
+ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
- };
- var typeKey = typeof(KafkaProducer);
- var producer = GetProducer(typeKey);
- if (partition.HasValue)
- {
- var topicPartition = new TopicPartition(topic,new Partition(partition.Value));
- //_logger.LogError($"push消息:{topic}-{partition.Value}");
- producer.Produce(topicPartition, message, deliveryHandler);
+ };
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
+ if (partition.HasValue)
+ {
+ var topicPartition = new TopicPartition(topic, new Partition(partition.Value));
+ //_logger.LogError($"push消息:{topic}-{partition.Value}");
+ producer.Produce(topicPartition, message, deliveryHandler);
+ }
+ else
+ {
+ producer.Produce(topic, message, deliveryHandler);
+ }
+ await Task.CompletedTask;
}
- else
+ catch (Exception ex)
{
- producer.Produce(topic, message, deliveryHandler);
+
+ throw;
}
- await Task.CompletedTask;
}
public void Dispose()
diff --git a/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs
index 8034954..98fda49 100644
--- a/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs
+++ b/modules/JiShe.CollectBus.Kafka/Serialization/JsonSerializer.cs
@@ -19,6 +19,7 @@ namespace JiShe.CollectBus.Kafka.Serialization
{
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,// 设置格式化输出
+ IncludeFields = true,// 允许反序列化到非公共 setter 和字段
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true,
@@ -53,7 +54,7 @@ namespace JiShe.CollectBus.Kafka.Serialization
{
if (data.IsEmpty)
return default;
- return JsonSerializer.Deserialize(data, _options)!;
+ return JsonSerializer.Deserialize(data, _options)!;
}
catch (Exception ex)
{
@@ -102,24 +103,37 @@ namespace JiShe.CollectBus.Kafka.Serialization
}
public static object? Deserialize(object value, Type valueType)
{
- var _jsonSerializerOptions = new JsonSerializerOptions
+ try
{
- DefaultIgnoreCondition = JsonIgnoreCondition.Never,
- WriteIndented = false,// 设置格式化输出
- Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
- IgnoreReadOnlyFields = true,
- IgnoreReadOnlyProperties = true,
- NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
- AllowTrailingCommas = true, // 忽略尾随逗号
- ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
- PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
- PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
- Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
- };
+ var _jsonSerializerOptions = new JsonSerializerOptions
+ {
+ DefaultIgnoreCondition = JsonIgnoreCondition.Never,
+ WriteIndented = false,// 设置格式化输出
+ IncludeFields = true,// 允许反序列化到非公共 setter 和字段
+ Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
+ IgnoreReadOnlyFields = true,
+ IgnoreReadOnlyProperties = true,
+ NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
+ AllowTrailingCommas = true, // 忽略尾随逗号
+ ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
+ PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
+ Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
+ };
- if (value is JsonElement jsonElement) return jsonElement.Deserialize(valueType, _jsonSerializerOptions);
+ if (value is JsonElement jsonElement)
+ {
+ //return jsonElement.Deserialize(valueType, _jsonSerializerOptions);
+ return JsonSerializer.Deserialize(jsonElement, valueType, _jsonSerializerOptions);
+ }
- throw new NotSupportedException("Type is not of type JsonElement");
+ return null;
+ }
+ catch (Exception ex)
+ {
+
+ throw;
+ }
}
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs
index e5511f2..7dc8e43 100644
--- a/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T1882018/SendData/Telemetry1882018PacketBuilder.cs
@@ -46,7 +46,7 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
{
var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];//01
- var d_data = itemCodeArr[1];//91 或者 90
+ var d_data = itemCodeArr[2];//91 或者 90
var dataUnit = new List() { "1F", d_data, "00" };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);
@@ -64,7 +64,7 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
{
var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];//01
- var d_data = itemCodeArr[1];//55 或者 99
+ var d_data = itemCodeArr[2];//55 或者 99
var dataUnit = new List() { "A0", "17", "00", d_data };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);
diff --git a/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs
index 7e9f3a3..5b3044a 100644
--- a/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T1882018/T1882018ProtocolPlugin.cs
@@ -92,7 +92,9 @@ namespace JiShe.CollectBus.Protocol.T1882018
List dataUnit = new List();
//数据转发场景 10H_F1
if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
- {
+ {
+ //var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
+
var t188PacketHandlerName = $"{T1882018PacketItemCodeConst.BasicT1882018}_{request.SubProtocolRequest.ItemCode}_Send";
Telemetry1882018PacketResponse t645PacketResponse = null;
diff --git a/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs
index 2d20272..05def67 100644
--- a/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T6452007/SendData/Telemetry6452007PacketBuilder.cs
@@ -47,7 +47,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
{
var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];
- var n_data = itemCodeArr[1];
+ var n_data = itemCodeArr[2];
string password = request.Password;
string pwdLevel = "02";
@@ -78,7 +78,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
{
var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];
- var n_data = itemCodeArr[1];
+ var n_data = itemCodeArr[2];
string password = request.Password;
if (!string.IsNullOrWhiteSpace(password) && password.Contains("|"))
diff --git a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs
index a4f820b..4f06f5a 100644
--- a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs
@@ -93,7 +93,9 @@ namespace JiShe.CollectBus.Protocol.T6452007
//数据转发场景 10H_F1
if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{
- var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send";
+ var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
+ var t645PacketHandlerName = $"C{subItemCodeArr[0]}_{subItemCodeArr[1]}_Send";//C1C_01_Send
+
Telemetry6452007PacketResponse t645PacketResponse = null;
if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 05c3007..025c250 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -122,10 +122,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
- BrandType = "DDS1980",
+ BrandType = "DTS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
MeteringPort = 2,
+ Password = "000000",
});
ammeterInfos.Add(new DeviceInfo()
@@ -141,10 +142,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
- BrandType = "DDS1980",
+ BrandType = "DTS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
MeteringPort = 2,
+ Password = "000000",
});
//ammeterInfos.Add(new DeviceInfo()
@@ -287,12 +289,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = "442400040",
FocusId = 95780,
ProjectID = 1,
- TripType = "off",
+ TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 127035,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
- TimeDensity = 15,
+ TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
@@ -301,7 +303,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = "442400039",
FocusId = 69280,
ProjectID = 1,
- TripType = "off",
+ TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 95594,
LoopType = "EachDay",
@@ -377,17 +379,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
ammeterInfo.TripState = 0;
tripStateResult = true;
- subItemCode = T6452007PacketItemCodeConst.C1C01C;
+ subItemCode = T6452007PacketItemCodeConst.C1C011C;
if (ammeterInfo.TypeName != 1)
{
- subItemCode = T6452007PacketItemCodeConst.C1C01B;
+ subItemCode = T6452007PacketItemCodeConst.C1C011B;
}
}
else if (settingInfo.TripType.Equals("off"))
{
ammeterInfo.TripState = 1;
tripStateResult = false;
- subItemCode = T6452007PacketItemCodeConst.C1C01A;
+ subItemCode = T6452007PacketItemCodeConst.C1C011A;
}
else
{
diff --git a/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs b/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs
index 92da5ba..6bdb750 100644
--- a/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs
+++ b/shared/JiShe.CollectBus.Common/Consts/T6452007PacketItemCodeConst.cs
@@ -16,37 +16,37 @@ namespace JiShe.CollectBus.Common.Consts
///
/// 跳闸
///
- public const string C1C01A = "1C_1A";
+ public const string C1C011A = "1C_01_1A";
///
/// 单相合闸
///
- public const string C1C01B = "1C_1B";
+ public const string C1C011B = "1C_01_1B";
///
/// 三相合闸
///
- public const string C1C01C = "1C_1C";
+ public const string C1C011C = "1C_01_1C";
///
/// 触发报警
///
- public const string C1C02A = "1C_2A";
+ public const string C1C021A = "1C_01_2A";
///
/// 报警解除
///
- public const string C1C02B = "1C_2B";
+ public const string C1C012B = "1C_01_2B";
///
/// 保电开始
///
- public const string C1C03A = "1C_3A";
+ public const string C1C033A = "1C_03_3A";
///
/// 保电结束
///
- public const string C1C03B = "1C_3B";
+ public const string C1C033B = "1C_03_3B";
#endregion
#region 广播校时