This commit is contained in:
zenghongyao 2025-05-15 09:53:43 +08:00
commit 84bb1a9563
18 changed files with 742 additions and 590 deletions

View File

@ -44,7 +44,11 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
/// </summary> /// </summary>
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
/// <summary>
/// 设备路径
/// </summary>
private string _devicePath;
/// <summary> /// <summary>
/// 设备路径 /// 设备路径
/// </summary> /// </summary>
@ -52,18 +56,16 @@ namespace JiShe.CollectBus.IoTDB.Model
{ {
get get
{ {
return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; // 如果未手动设置路径,则自动生成
if (string.IsNullOrWhiteSpace(_devicePath))
{
return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
}
return _devicePath;
} }
set set
{ {
if (string.IsNullOrWhiteSpace(value)) _devicePath = value; // 直接赋值给支持字段,避免递归
{
DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
}
else
{
DevicePath = value;
}
} }
} }
} }

View File

@ -56,7 +56,6 @@ namespace JiShe.CollectBus.IoTDB.Options
return declaredTypeName?.ToUpper() switch return declaredTypeName?.ToUpper() switch
{ {
"DATETIME" => v => v != null ? ((DateTime)v).ToUniversalTime().Ticks : null, "DATETIME" => v => v != null ? ((DateTime)v).ToUniversalTime().Ticks : null,
"BOOLEAN" => v => v != null && (bool)v ? 1 : 0,
"STRING" => v => v != null ? $"'{v}'" : "''", "STRING" => v => v != null ? $"'{v}'" : "''",
_ => v => v _ => v => v
}; };

View File

@ -1,6 +1,7 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common; using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization; using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.DataProtection.KeyManagement; using Microsoft.AspNetCore.DataProtection.KeyManagement;
@ -127,80 +128,88 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns> /// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => try
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{ {
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token); var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
if (result == null || result.Message == null || result.Message.Value == null) var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{ {
await Task.Delay(DelayTime, cts.Token); //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
continue;
} var result = consumer.Consume(cts.Token);
if (result.IsPartitionEOF) if (result == null || result.Message == null || result.Message.Value == null)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{ {
consumer.Commit(result); // 提交偏移量 await Task.Delay(DelayTime, cts.Token);
// 跳过消息
continue; continue;
} }
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
} }
bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
} }
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) }, cts.Token);
{ await Task.CompletedTask;
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}"); });
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
});
}
catch (Exception ex)
{
throw;
}
} }
@ -215,76 +224,84 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns> /// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => try
{ {
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{ {
int count = 0; var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
while (!cts.IsCancellationRequested) var cts = new CancellationTokenSource();
{ var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
try (
{ CreateConsumer<Ignore, TValue>(groupId),
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息...."); cts
count++; )).Consumer as IConsumer<Ignore, TValue>;
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (result.IsPartitionEOF) consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
int count = 0;
while (!cts.IsCancellationRequested)
{
try
{ {
#if DEBUG //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); count++;
#endif var result = consumer.Consume(cts.Token);
await Task.Delay(DelayTime, cts.Token); if (result == null || result.Message == null || result.Message.Value == null)
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{ {
consumer.Commit(result); // 提交偏移量 await Task.Delay(DelayTime, cts.Token);
// 跳过消息
continue; continue;
} }
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
//else
// consumer.StoreOffset(result);
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
} }
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
//else
// consumer.StoreOffset(result);
} }
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) }, cts.Token);
{ await Task.CompletedTask;
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}"); });
throw; // 抛出异常,以便重试 }
} catch (Exception ex)
catch (OperationCanceledException) {
{
//ignore throw;
} }
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
});
} }
@ -300,7 +317,15 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="batchTimeout">批次超时时间</param> /// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{ {
await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); try
{
await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
}
catch (Exception ex)
{
throw;
}
} }
/// <summary> /// <summary>
@ -315,117 +340,125 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="batchTimeout">批次超时时间</param> /// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => try
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{ {
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested) var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{ {
try var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
{ var startTime = DateTime.UtcNow;
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null) while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{ {
if (result.IsPartitionEOF) var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{ {
if (result.IsPartitionEOF)
{
#if DEBUG #if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif #endif
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token); await Task.Delay(DelayTime, cts.Token);
} }
else if (result.Message.Value != null) }
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{ {
if (_kafkaOptionConfig.EnableFilter) var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; var tp = msg.Offset.TopicPartition;
// 检查 Header 是否符合条件 var offset = msg.Offset.Offset;
if (!headersFilter.Match(result.Message.Headers)) if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{ {
consumer.Commit(result); // 提交偏移量 offsetsByPartition[tp] = offset;
// 跳过消息
continue;
} }
} }
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次 var offsetsToCommit = offsetsByPartition
if (messages.Count > 0) .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{ {
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); _logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
if (success) throw; // 抛出异常,以便重试
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
} }
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
startTime = DateTime.UtcNow; await Task.CompletedTask;
} });
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) }
{ catch (Exception ex)
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}"); {
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask; throw;
}); }
} }
@ -441,7 +474,15 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="consumeTimeout">消费等待时间</param> /// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class public async Task SubscribeBatchAsync<TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{ {
await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); try
{
await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
}
catch (Exception ex)
{
throw;
}
} }
@ -458,116 +499,124 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="consumeTimeout">消费等待时间</param> /// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class public async Task SubscribeBatchAsync<TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => try
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{ {
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested) var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{ {
try var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
{ var startTime = DateTime.UtcNow;
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null) while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{ {
if (result.IsPartitionEOF) var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{ {
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); if (result.IsPartitionEOF)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token); await Task.Delay(DelayTime, cts.Token);
} }
else if (result.Message.Value != null) }
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{ {
if (_kafkaOptionConfig.EnableFilter) var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; var tp = msg.Offset.TopicPartition;
// 检查 Header 是否符合条件 var offset = msg.Offset.Offset;
if (!headersFilter.Match(result.Message.Headers)) if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{ {
consumer.Commit(result); // 提交偏移量 offsetsByPartition[tp] = offset;
// 跳过消息
continue;
} }
} }
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次 var offsetsToCommit = offsetsByPartition
if (messages.Count > 0) .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{ {
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
if (success) throw; // 抛出异常,以便重试
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
} }
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
startTime = DateTime.UtcNow; await Task.CompletedTask;
} });
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) }
{ catch (Exception ex)
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); {
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask; throw;
}); }
} }
@ -578,12 +627,20 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <typeparam name="TValue"></typeparam> /// <typeparam name="TValue"></typeparam>
public void Unsubscribe<TKey, TValue>(string[] topics, string? groupId) where TKey : notnull where TValue : class public void Unsubscribe<TKey, TValue>(string[] topics, string? groupId) where TKey : notnull where TValue : class
{ {
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; try
if (_consumerStore.TryRemove(consumerKey, out var entry))
{ {
entry.CTS.Cancel(); var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
(entry.Consumer as IDisposable)?.Dispose(); if (_consumerStore.TryRemove(consumerKey, out var entry))
entry.CTS.Dispose(); {
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
}
catch (Exception ex)
{
throw;
} }
} }

View File

@ -226,6 +226,11 @@ namespace JiShe.CollectBus.Kafka
// 处理消费错误 // 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}"); logger.LogError($"kafka批量消费异常:{ex.Message}");
} }
catch (Exception ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false); return await Task.FromResult(false);
}, attr.GroupId, attr.BatchSize, attr.BatchTimeout); }, attr.GroupId, attr.BatchSize, attr.BatchTimeout);
} }
@ -248,6 +253,11 @@ namespace JiShe.CollectBus.Kafka
// 处理消费错误 // 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}"); logger.LogError($"kafka消费异常:{ex.Message}");
} }
catch (Exception ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false); return await Task.FromResult(false);
}, attr.GroupId); }, attr.GroupId);
} }
@ -260,125 +270,133 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
private static async Task<bool> ProcessMessageAsync(List<dynamic> messages, MethodInfo method, object subscribe) private static async Task<bool> ProcessMessageAsync(List<dynamic> messages, MethodInfo method, object subscribe)
{ {
var parameters = method.GetParameters(); try
bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0;
object[]? executeParameters = null;
if (existParameters)
{ {
IList? list = null; var parameters = method.GetParameters();
Tuple<Type, Type?> tuple = method.GetParameterTypeInfo(); bool isGenericTask = method.ReturnType.IsGenericType
bool isEnumerable = false; && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
if (tuple.Item2 != null) bool existParameters = parameters.Length > 0;
{ object[]? executeParameters = null;
Type listType = typeof(List<>).MakeGenericType(tuple.Item2);
list = (IList)Activator.CreateInstance(listType)!;
isEnumerable = tuple.Item2.IsConvertType();
}
else
{
isEnumerable = tuple.Item1.IsConvertType();
}
#region
//foreach (var msg in messages)
//{
// if (tuple.Item2 != null)
// {
// if (isEnumerable)
// {
// var parameterType = parameters[0].ParameterType;
// var data=messages?.Serialize().Deserialize(parameterType);
// messageObj = data!=null? new[] { data }:null;
// break;
// }
// else
// {
// // 集合类型
// var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/;
// if (data != null)
// list?.Add(data);
// }
// } if (existParameters)
// else
// {
// // (dynamic)Convert.ChangeType(msg, tuple.Item1)
// using (var stream = new MemoryStream(msg))
// {
// var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1);
// }
// var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1);
// if (data != null)
// messageObj = new[] { data };
// }
//}
//if (tuple.Item2 != null && list != null && list.Count > 0)
//{
// messageObj = new[] { list };
//}
#endregion
var parameterDescriptors = method.GetParameters();
executeParameters = new object?[parameterDescriptors.Length];
for (var i = 0; i < parameterDescriptors.Length; i++)
{ {
foreach (var item in messages) IList? list = null;
{ Tuple<Type, Type?> tuple = method.GetParameterTypeInfo();
bool isEnumerable = false;
object? tempParameter=null; if (tuple.Item2 != null)
var parameterDescriptor = parameterDescriptors[i]; {
if (KafkaSerialization.IsJsonType(item)) Type listType = typeof(List<>).MakeGenericType(tuple.Item2);
list = (IList)Activator.CreateInstance(listType)!;
isEnumerable = tuple.Item2.IsConvertType();
}
else
{
isEnumerable = tuple.Item1.IsConvertType();
}
#region
//foreach (var msg in messages)
//{
// if (tuple.Item2 != null)
// {
// if (isEnumerable)
// {
// var parameterType = parameters[0].ParameterType;
// var data=messages?.Serialize().Deserialize(parameterType);
// messageObj = data!=null? new[] { data }:null;
// break;
// }
// else
// {
// // 集合类型
// var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/;
// if (data != null)
// list?.Add(data);
// }
// }
// else
// {
// // (dynamic)Convert.ChangeType(msg, tuple.Item1)
// using (var stream = new MemoryStream(msg))
// {
// var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1);
// }
// var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1);
// if (data != null)
// messageObj = new[] { data };
// }
//}
//if (tuple.Item2 != null && list != null && list.Count > 0)
//{
// messageObj = new[] { list };
//}
#endregion
var parameterDescriptors = method.GetParameters();
executeParameters = new object?[parameterDescriptors.Length];
for (var i = 0; i < parameterDescriptors.Length; i++)
{
foreach (var item in messages)
{ {
tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType);
} object? tempParameter = null;
else var parameterDescriptor = parameterDescriptors[i];
{ if (KafkaSerialization.IsJsonType(item))
var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(item.GetType()))
{ {
tempParameter = converter.ConvertFrom(item); tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null ? tuple.Item2 : parameterDescriptor.ParameterType);
} }
else else
{ {
if (parameterDescriptor.ParameterType.IsInstanceOfType(item))
tempParameter = item;
else
tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType);
}
}
if (tuple.Item2 == null)
{
executeParameters[i] = tempParameter;
}
else
{
list.Add(tempParameter);
}
}
if(list!=null && list.Count>0)
executeParameters[i] = list;
}
}
var result = method.Invoke(subscribe, executeParameters); var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (result is Task<ISubscribeAck> genericTask) if (converter.CanConvertFrom(item.GetType()))
{ {
await genericTask.ConfigureAwait(false); tempParameter = converter.ConvertFrom(item);
return genericTask.Result.Ack; }
else
{
if (parameterDescriptor.ParameterType.IsInstanceOfType(item))
tempParameter = item;
else
tempParameter = Convert.ChangeType(item, parameterDescriptor.ParameterType);
}
}
if (tuple.Item2 == null)
{
executeParameters[i] = tempParameter;
}
else
{
list.Add(tempParameter);
}
}
if (list != null && list.Count > 0)
executeParameters[i] = list;
}
}
var result = method.Invoke(subscribe, executeParameters);
if (result is Task<ISubscribeAck> genericTask)
{
await genericTask.ConfigureAwait(false);
return genericTask.Result.Ack;
}
else if (result is Task nonGenericTask)
{
await nonGenericTask.ConfigureAwait(false);
return true;
}
else if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
return false;
} }
else if (result is Task nonGenericTask) catch (Exception ex)
{ {
await nonGenericTask.ConfigureAwait(false);
return true; throw;
} }
else if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
return false;
} }
} }

View File

@ -117,17 +117,25 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
{ {
var typeKey = typeof(KafkaProducer<TKey, TValue>); try
var producer = GetProducer<TKey, TValue>(typeKey);
var message = new Message<TKey, TValue>
{ {
Key = key, var typeKey = typeof(KafkaProducer<TKey, TValue>);
Value = value, var producer = GetProducer<TKey, TValue>(typeKey);
Headers = new Headers{ var message = new Message<TKey, TValue>
{
Key = key,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
} }
}; };
await producer.ProduceAsync(topic, message); await producer.ProduceAsync(topic, message);
}
catch (Exception ex)
{
throw;
}
} }
/// <summary> /// <summary>
@ -139,16 +147,24 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{ {
var typeKey = typeof(KafkaProducer<string, TValue>); try
var producer = GetProducer<Null, TValue>(typeKey);
var message = new Message<Null, TValue>
{ {
Value = value, var typeKey = typeof(KafkaProducer<string, TValue>);
Headers = new Headers{ var producer = GetProducer<Null, TValue>(typeKey);
var message = new Message<Null, TValue>
{
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
} }
}; };
await producer.ProduceAsync(topic, message); await producer.ProduceAsync(topic, message);
}
catch (Exception ex)
{
throw;
}
} }
/// <summary> /// <summary>
@ -164,26 +180,34 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic,TKey key,TValue value,int? partition=null, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)where TKey : notnull where TValue : class public async Task ProduceAsync<TKey, TValue>(string topic,TKey key,TValue value,int? partition=null, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)where TKey : notnull where TValue : class
{ {
var message = new Message<TKey, TValue> try
{ {
Key = key, var message = new Message<TKey, TValue>
Value = value, {
Headers = new Headers{ Key = key,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
} }
}; };
var typeKey = typeof(KafkaProducer<TKey, TValue>); var typeKey = typeof(KafkaProducer<TKey, TValue>);
var producer = GetProducer<TKey, TValue>(typeKey); var producer = GetProducer<TKey, TValue>(typeKey);
if (partition.HasValue) if (partition.HasValue)
{ {
var topicPartition = new TopicPartition(topic, new Partition(partition.Value)); var topicPartition = new TopicPartition(topic, new Partition(partition.Value));
producer.Produce(topicPartition, message, deliveryHandler); producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
} }
else catch (Exception ex)
{ {
producer.Produce(topic, message, deliveryHandler);
throw;
} }
await Task.CompletedTask;
} }
@ -199,26 +223,34 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
{ {
var message = new Message<Null, TValue> try
{ {
Value = value, var message = new Message<Null, TValue>
Headers = new Headers{ {
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
} }
}; };
var typeKey = typeof(KafkaProducer<Null, TValue>); var typeKey = typeof(KafkaProducer<Null, TValue>);
var producer = GetProducer<Null, TValue>(typeKey); var producer = GetProducer<Null, TValue>(typeKey);
if (partition.HasValue) if (partition.HasValue)
{ {
var topicPartition = new TopicPartition(topic,new Partition(partition.Value)); var topicPartition = new TopicPartition(topic, new Partition(partition.Value));
//_logger.LogError($"push消息{topic}-{partition.Value}"); //_logger.LogError($"push消息{topic}-{partition.Value}");
producer.Produce(topicPartition, message, deliveryHandler); producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
} }
else catch (Exception ex)
{ {
producer.Produce(topic, message, deliveryHandler);
throw;
} }
await Task.CompletedTask;
} }
public void Dispose() public void Dispose()

View File

@ -19,6 +19,7 @@ namespace JiShe.CollectBus.Kafka.Serialization
{ {
DefaultIgnoreCondition = JsonIgnoreCondition.Never, DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,// 设置格式化输出 WriteIndented = false,// 设置格式化输出
IncludeFields = true,// 允许反序列化到非公共 setter 和字段
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
IgnoreReadOnlyFields = true, IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true, IgnoreReadOnlyProperties = true,
@ -53,7 +54,7 @@ namespace JiShe.CollectBus.Kafka.Serialization
{ {
if (data.IsEmpty) if (data.IsEmpty)
return default; return default;
return JsonSerializer.Deserialize<T>(data, _options)!; return JsonSerializer.Deserialize<T>(data, _options)!;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -102,24 +103,37 @@ namespace JiShe.CollectBus.Kafka.Serialization
} }
public static object? Deserialize(object value, Type valueType) public static object? Deserialize(object value, Type valueType)
{ {
var _jsonSerializerOptions = new JsonSerializerOptions try
{ {
DefaultIgnoreCondition = JsonIgnoreCondition.Never, var _jsonSerializerOptions = new JsonSerializerOptions
WriteIndented = false,// 设置格式化输出 {
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 DefaultIgnoreCondition = JsonIgnoreCondition.Never,
IgnoreReadOnlyFields = true, WriteIndented = false,// 设置格式化输出
IgnoreReadOnlyProperties = true, IncludeFields = true,// 允许反序列化到非公共 setter 和字段
NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串 Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
AllowTrailingCommas = true, // 忽略尾随逗号 IgnoreReadOnlyFields = true,
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 IgnoreReadOnlyProperties = true,
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 AllowTrailingCommas = true, // 忽略尾随逗号
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
}; PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
};
if (value is JsonElement jsonElement) return jsonElement.Deserialize(valueType, _jsonSerializerOptions); if (value is JsonElement jsonElement)
{
//return jsonElement.Deserialize(valueType, _jsonSerializerOptions);
return JsonSerializer.Deserialize(jsonElement, valueType, _jsonSerializerOptions);
}
throw new NotSupportedException("Type is not of type JsonElement"); return null;
}
catch (Exception ex)
{
throw;
}
} }
} }
} }

View File

@ -46,7 +46,7 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
{ {
var itemCodeArr = request.ItemCode.Split('_'); var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];//01 var c_data = itemCodeArr[0];//01
var d_data = itemCodeArr[1];//91 或者 90 var d_data = itemCodeArr[2];//91 或者 90
var dataUnit = new List<string>() { "1F", d_data, "00" }; var dataUnit = new List<string>() { "1F", d_data, "00" };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit); var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);
@ -64,7 +64,7 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
{ {
var itemCodeArr = request.ItemCode.Split('_'); var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];//01 var c_data = itemCodeArr[0];//01
var d_data = itemCodeArr[1];//55 或者 99 var d_data = itemCodeArr[2];//55 或者 99
var dataUnit = new List<string>() { "A0", "17", "00", d_data }; var dataUnit = new List<string>() { "A0", "17", "00", d_data };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit); var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);

View File

@ -92,7 +92,9 @@ namespace JiShe.CollectBus.Protocol.T1882018
List<string> dataUnit = new List<string>(); List<string> dataUnit = new List<string>();
//数据转发场景 10H_F1 //数据转发场景 10H_F1
if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false) if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{ {
//var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
var t188PacketHandlerName = $"{T1882018PacketItemCodeConst.BasicT1882018}_{request.SubProtocolRequest.ItemCode}_Send"; var t188PacketHandlerName = $"{T1882018PacketItemCodeConst.BasicT1882018}_{request.SubProtocolRequest.ItemCode}_Send";
Telemetry1882018PacketResponse t645PacketResponse = null; Telemetry1882018PacketResponse t645PacketResponse = null;

View File

@ -1,4 +1,5 @@
using System.Reflection; using System.Data;
using System.Reflection;
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
@ -259,6 +260,12 @@ namespace JiShe.CollectBus.Protocol.T37612012.SendData
#region AFN10H #region AFN10H
public static Telemetry3761PacketResponse AFN10_Fn_Send(Telemetry3761PacketRequest request) public static Telemetry3761PacketResponse AFN10_Fn_Send(Telemetry3761PacketRequest request)
{ {
var baudRateValue = Build3761SendData.GetBaudreate($"{request.SubRequest.Baudrate}");
var dataUnit = Build3761SendData.BuildTransparentForwardingSendDataUnit(request.SubRequest.MeteringPort, baudRateValue, request.DataUnit);
dataUnit.AddRange(Build3761SendData.GetPW());
var reqParameter = new ReqParameter2() var reqParameter = new ReqParameter2()
{ {
AFN = AFN., AFN = AFN.,
@ -275,7 +282,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.SendData
Pn = request.Pn, Pn = request.Pn,
Fn = request.Fn Fn = request.Fn
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit); var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, dataUnit);
return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
} }

View File

@ -1,4 +1,6 @@
namespace JiShe.CollectBus.Protocol.T37612012.SendData using JiShe.CollectBus.Protocol.Models;
namespace JiShe.CollectBus.Protocol.T37612012.SendData
{ {
/// <summary> /// <summary>
/// 构建3761报文参数 /// 构建3761报文参数
@ -20,8 +22,13 @@
/// </summary> /// </summary>
public int Pn { get; set; } public int Pn { get; set; }
/// <summary>
/// 子协议请求
/// </summary>
public SubProtocolBuildRequest SubRequest { get; set; }
/// <summary> /// <summary>
/// 透明转发单元 /// 透明转发数据单元
/// </summary> /// </summary>
public List<string> DataUnit { get; set; } public List<string> DataUnit { get; set; }
} }

View File

@ -47,7 +47,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
{ {
var itemCodeArr = request.ItemCode.Split('_'); var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0]; var c_data = itemCodeArr[0];
var n_data = itemCodeArr[1]; var n_data = itemCodeArr[2];
string password = request.Password; string password = request.Password;
string pwdLevel = "02"; string pwdLevel = "02";
@ -78,7 +78,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
{ {
var itemCodeArr = request.ItemCode.Split('_'); var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0]; var c_data = itemCodeArr[0];
var n_data = itemCodeArr[1]; var n_data = itemCodeArr[2];
string password = request.Password; string password = request.Password;
if (!string.IsNullOrWhiteSpace(password) && password.Contains("|")) if (!string.IsNullOrWhiteSpace(password) && password.Contains("|"))

View File

@ -1,23 +1,12 @@
namespace JiShe.CollectBus.Protocol.T6452007.SendData using JiShe.CollectBus.Protocol.Models;
namespace JiShe.CollectBus.Protocol.T6452007.SendData
{ {
/// <summary> /// <summary>
/// 构建645报文参数 /// 构建645报文参数
/// </summary> /// </summary>
public class Telemetry6452007PacketRequest public class Telemetry6452007PacketRequest: SubProtocolBuildRequest
{ {
/// <summary>
/// 表地址
/// </summary>
public required string MeterAddress { get; set; }
/// <summary>
/// 密码
/// </summary>
public required string Password { get; set; }
/// <summary>
/// 操作码
/// </summary>
public required string ItemCode { get; set; }
} }
} }

View File

@ -93,7 +93,9 @@ namespace JiShe.CollectBus.Protocol.T6452007
//数据转发场景 10H_F1 //数据转发场景 10H_F1
if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false) if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{ {
var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send"; var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
var t645PacketHandlerName = $"C{subItemCodeArr[0]}_{subItemCodeArr[1]}_Send";//C1C_01_Send
Telemetry6452007PacketResponse t645PacketResponse = null; Telemetry6452007PacketResponse t645PacketResponse = null;
if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName
@ -127,6 +129,7 @@ namespace JiShe.CollectBus.Protocol.T6452007
FocusAddress = request.FocusAddress, FocusAddress = request.FocusAddress,
Fn = fn, Fn = fn,
Pn = request.Pn, Pn = request.Pn,
SubRequest = request.SubProtocolRequest,
DataUnit = dataUnit, DataUnit = dataUnit,
}); });
} }

View File

@ -10,6 +10,16 @@
/// </summary> /// </summary>
public required string MeterAddress { get; set; } public required string MeterAddress { get; set; }
/// <summary>
/// 波特率 default(2400)
/// </summary>
public int Baudrate { get; set; }
/// <summary>
/// 计量端口
/// </summary>
public int MeteringPort { get; set; }
/// <summary> /// <summary>
/// 密码 /// 密码
/// </summary> /// </summary>

View File

@ -122,9 +122,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TypeName = 1, TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15, TimeDensity = 15,
BrandType = "DDS1980", BrandType = "DTS1980",
MeterType = MeterTypeEnum.Ammeter, MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1, ProjectID = 1,
MeteringPort = MeteringPortConst.MeteringPortTwo,
Password = "000000",
}); });
ammeterInfos.Add(new DeviceInfo() ammeterInfos.Add(new DeviceInfo()
@ -140,9 +142,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TypeName = 1, TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15, TimeDensity = 15,
BrandType = "DDS1980", BrandType = "DTS1980",
MeterType = MeterTypeEnum.Ammeter, MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1, ProjectID = 1,
MeteringPort = MeteringPortConst.MeteringPortTwo,
Password = "000000",
}); });
//ammeterInfos.Add(new DeviceInfo() //ammeterInfos.Add(new DeviceInfo()
@ -275,67 +279,49 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string currentTimeStr = $"{currentTime:HH:mm}"; string currentTimeStr = $"{currentTime:HH:mm}";
try try
{ {
#if DEBUG #if DEBUG
var settingInfos = new List<AmmeterAutoValveControlSetting>(); //电表自动阀控缓存
settingInfos.Add(new AmmeterAutoValveControlSetting() string redisCacheDeviceSettingInfoHashKey = $"redisCacheDeviceSettingInfoHashKey_{SystemType}_{ServerTagName}";
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000040",
FocusAddress = "442400040",
FocusId = 95780,
ProjectID = 1,
TripType = "off",
TripTime = "14:02",
MeterId = 127035,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000039",
FocusAddress = "442400039",
FocusId = 69280,
ProjectID = 1,
TripType = "off",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 95594,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting() var settingInfos = FreeRedisProvider.Instance.Get< List<AmmeterAutoValveControlSetting>> (redisCacheDeviceSettingInfoHashKey);
if (settingInfos == null || settingInfos.Count <= 0)
{ {
MeterType = MeterTypeEnum.Ammeter, settingInfos = new List<AmmeterAutoValveControlSetting>();
AmmerterAddress = "321410010270", settingInfos.Add(new AmmeterAutoValveControlSetting()
FocusAddress = "322011149", {
FocusId = 333333, MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1, AmmerterAddress = "442405000040",
TripType = "on", FocusAddress = "442400040",
TripTime = $"{DateTime.Now:HH:mm}", FocusId = 95780,
MeterId = 333333, ProjectID = 1,
LoopType = "EachDay", TripType = "on",
EachDayWithout = "周六,周日", TripTime = $"{DateTime.Now:HH:mm}",
TimeDensity = 15, MeterId = 127035,
}); LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000039",
FocusAddress = "442400039",
FocusId = 69280,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 95594,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
FreeRedisProvider.Instance.Set(redisCacheDeviceSettingInfoHashKey, settingInfos);
}
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "112410027787",
FocusAddress = "322011149",
FocusId = 222222,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 222222,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
#else #else
//获取电表阀控配置 //获取电表阀控配置
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr); var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
@ -401,21 +387,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
bool tripStateResult = false; bool tripStateResult = false;
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H; string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = string.Empty; string subItemCode = string.Empty;
if (settingInfo.TripType.Equals("on")) if (settingInfo.TripType.Equals("on"))//当前电表断闸,需要合闸
{ {
ammeterInfo.TripState = 0; ammeterInfo.TripState = 0;
tripStateResult = true; tripStateResult = true;
subItemCode = T6452007PacketItemCodeConst.C1C01C; subItemCode = T6452007PacketItemCodeConst.C1C011C;
if (ammeterInfo.TypeName != 1) if (ammeterInfo.TypeName != 1)
{ {
subItemCode = T6452007PacketItemCodeConst.C1C01B; subItemCode = T6452007PacketItemCodeConst.C1C011B;
} }
} }
else if (settingInfo.TripType.Equals("off")) else if (settingInfo.TripType.Equals("off"))//当前电表合闸,需要断闸
{ {
ammeterInfo.TripState = 1; ammeterInfo.TripState = 1;
tripStateResult = false; tripStateResult = false;
subItemCode = T6452007PacketItemCodeConst.C1C01A; subItemCode = T6452007PacketItemCodeConst.C1C011A;
} }
else else
{ {
@ -442,6 +428,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MeterAddress = ammeterInfo.MeterAddress, MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password, Password = ammeterInfo.Password,
ItemCode = subItemCode, ItemCode = subItemCode,
Baudrate = ammeterInfo.Baudrate,
MeteringPort = ammeterInfo.MeteringPort
} }
}); });

View File

@ -1362,7 +1362,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
/// <param name="waitContentTimeout">等待报文超时时间/s</param> /// <param name="waitContentTimeout">等待报文超时时间/s</param>
/// <param name="waitByteTimeout">等待字节超时时间/ms</param> /// <param name="waitByteTimeout">等待字节超时时间/ms</param>
/// <returns></returns> /// <returns></returns>
private static List<string> BuildTransparentForwardingSendDataUnit(int port, int baudRate, List<string> datas, StopBit stopBit = StopBit.Stop1, Parity parity = Parity.Even, DataBit dataBit = DataBit.D8, public static List<string> BuildTransparentForwardingSendDataUnit(int port, int baudRate, List<string> datas, StopBit stopBit = StopBit.Stop1, Parity parity = Parity.Even, DataBit dataBit = DataBit.D8,
int waitContentTimeout = 100, int waitByteTimeout = 100) int waitContentTimeout = 100, int waitByteTimeout = 100)
{ {
var dataUnit = new List<string>(); var dataUnit = new List<string>();
@ -1591,7 +1591,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
//AUX=消息认证码字段PW,16个字节 //AUX=消息认证码字段PW,16个字节
private static List<string> GetPW() public static List<string> GetPW()
{ {
var str = "00"; var str = "00";
var pWList = Enumerable.Repeat(str, pWLen).ToList(); var pWList = Enumerable.Repeat(str, pWLen).ToList();

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Consts
{
/// <summary>
/// 计量端口常量
/// </summary>
public class MeteringPortConst
{
/// <summary>
/// 计量端口1
/// </summary>
public const int MeteringPortOne = 1;
/// <summary>
/// 计量端口2
/// </summary>
public const int MeteringPortTwo = 2;
}
}

View File

@ -16,37 +16,37 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary> /// <summary>
/// 跳闸 /// 跳闸
/// </summary> /// </summary>
public const string C1C01A = "1C_1A"; public const string C1C011A = "1C_01_1A";
/// <summary> /// <summary>
/// 单相合闸 /// 单相合闸
/// </summary> /// </summary>
public const string C1C01B = "1C_1B"; public const string C1C011B = "1C_01_1B";
/// <summary> /// <summary>
/// 三相合闸 /// 三相合闸
/// </summary> /// </summary>
public const string C1C01C = "1C_1C"; public const string C1C011C = "1C_01_1C";
/// <summary> /// <summary>
/// 触发报警 /// 触发报警
/// </summary> /// </summary>
public const string C1C02A = "1C_2A"; public const string C1C021A = "1C_01_2A";
/// <summary> /// <summary>
/// 报警解除 /// 报警解除
/// </summary> /// </summary>
public const string C1C02B = "1C_2B"; public const string C1C012B = "1C_01_2B";
/// <summary> /// <summary>
/// 保电开始 /// 保电开始
/// </summary> /// </summary>
public const string C1C03A = "1C_3A"; public const string C1C033A = "1C_03_3A";
/// <summary> /// <summary>
/// 保电结束 /// 保电结束
/// </summary> /// </summary>
public const string C1C03B = "1C_3B"; public const string C1C033B = "1C_03_3B";
#endregion #endregion
#region 广 #region 广