This commit is contained in:
cli 2025-04-23 14:46:56 +08:00
commit 039a1a8b89
27 changed files with 1614 additions and 751 deletions

View File

@ -13,6 +13,12 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <returns></returns>
Task OpenAsync();
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
Task CloseAsync();
/// <summary>
/// 插入数据
/// </summary>

View File

@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 系统名称
/// </summary>
[TAGColumn]
public string SystemName { get; set; }
public string SystemName { get; set; }
/// <summary>
/// 项目编码
/// </summary>
[ATTRIBUTEColumn]
public string ProjectId { get; set; }
[TAGColumn]
public string ProjectId { get; set; }
/// <summary>
/// 设备类型集中器、电表、水表、流量计、传感器等
/// </summary>
[ATTRIBUTEColumn]
public string DeviceType { get; set; }
[TAGColumn]
public string DeviceType { get; set; }
/// <summary>
/// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址
/// 设备ID,数据生成者例如集中器ID,电表ID、水表ID、流量计ID、传感器ID等
/// </summary>
[TAGColumn]
public string DeviceId { get; set; }
public string DeviceId { get; set; }
/// <summary>
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
/// </summary>
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
}

View File

@ -47,5 +47,10 @@
/// 时区,默认为:"UTC+08:00"
/// </summary>
public string ZoneId { get; set; } = "UTC+08:00";
/// <summary>
/// 请求超时时间单位毫秒默认为50000
/// </summary>
public long Timeout { get; set; } = 50000;
}
}

View File

@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns>
public static string GetDevicePath<T>(T entity) where T : IoTEntity
{
return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`";
return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
}
@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns>
public static string GetDeviceTableName<T>(T entity) where T : IoTEntity
{
return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`";
return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
}
}

View File

@ -1,6 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.ComponentModel.DataAnnotations;
using System.Reflection;
using System.Reflection.Metadata.Ecma335;
using System.Text;
using System.Threading.Tasks;
using Apache.IoTDB;
@ -23,7 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary>
/// IoTDB数据源
/// </summary>
public class IoTDbProvider : IIoTDbProvider, IScopedDependency
public class IoTDbProvider : IIoTDbProvider, ITransientDependency
{
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
private readonly ILogger<IoTDbProvider> _logger;
@ -195,7 +197,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
try
{
var query =await BuildQuerySQL<T>(options);
var query = await BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
@ -205,15 +207,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
Items = await ParseResults<T>(sessionDataSet, options.PageSize),
PageIndex = options.PageIndex,
PageSize = options.PageSize,
};
result.HasNext = result.TotalCount > 0? result.TotalCount < result.PageSize : false;
result.HasNext = result.TotalCount > 0 ? result.TotalCount < result.PageSize : false;
return result;
}
catch (Exception ex)
{
CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
throw;
}
@ -414,7 +417,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity
{
var metadata = await GetMetadata<T>();
var sb = new StringBuilder("SELECT ");
var sb = new StringBuilder("SELECT TIME as Timestamps,");
sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}");
@ -471,7 +474,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
return condition.Operator switch
{
">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}": $"{condition.Field} > '{condition.Value}'",
">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}" : $"{condition.Field} > '{condition.Value}'",
"<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'",
"=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'",
_ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况")
@ -493,7 +496,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
if (result.HasNext())
{
await result.Close();
return 0;
}
var count = Convert.ToInt32(result.Next().Values[0]);
await result.Close();
return count;
}
/// <summary>
@ -510,6 +522,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
var properties = typeof(T).GetProperties();
var columns = new List<string>() { "Timestamps" };
var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP };
columns.AddRange(metadata.ColumnNames);
dataTypes.AddRange(metadata.DataTypes);
//metadata.ColumnNames.Insert(0, "Timestamps");
//metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP);
while (dataSet.HasNext() && results.Count < pageSize)
{
var record = dataSet.Next();
@ -518,30 +537,34 @@ namespace JiShe.CollectBus.IoTDB.Provider
Timestamps = record.Timestamps
};
foreach (var measurement in metadata.ColumnNames)
foreach (var measurement in columns)
{
int indexOf = metadata.ColumnNames.IndexOf(measurement);
int indexOf = columns.IndexOf(measurement);
var value = record.Values[indexOf];
TSDataType tSDataType = dataTypes[indexOf];
var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null)
if (prop != null && !(value is System.DBNull))
{
if (measurement.EndsWith("time"))
dynamic tempValue = GetTSDataValue(tSDataType, value);
if (measurement.ToLower().EndsWith("time"))
{
var tempValue = TimestampHelper.ConvertToDateTime(Convert.ToInt64(value), TimestampUnit.Nanoseconds);
typeof(T).GetProperty(measurement)?.SetValue(entity, value);
typeof(T).GetProperty(measurement)?.SetValue(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds));
}
else
{
typeof(T).GetProperty(measurement)?.SetValue(entity, value);
typeof(T).GetProperty(measurement)?.SetValue(entity, tempValue);
}
}
}
results.Add(entity);
}
await dataSet.Close();
return results;
}
@ -759,5 +782,28 @@ namespace JiShe.CollectBus.IoTDB.Provider
["DECIMAL"] = "0.0",
["STRING"] = string.Empty
};
/// <summary>
/// IoTDB 数据类型与.net类型映射
/// </summary>
/// <param name="tSDataType"></param>
/// <param name="value"></param>
/// <returns></returns>
private dynamic GetTSDataValue(TSDataType tSDataType, object value) =>
tSDataType switch
{
TSDataType.BOOLEAN => Convert.ToBoolean(value),
TSDataType.INT32 => Convert.ToInt32(value),
TSDataType.INT64 => Convert.ToInt64(value),
TSDataType.FLOAT => Convert.ToDouble(value),
TSDataType.DOUBLE => Convert.ToDouble(value),
TSDataType.TEXT => Convert.ToString(value),
TSDataType.NONE => null,
TSDataType.TIMESTAMP => Convert.ToInt64(value),
TSDataType.DATE => Convert.ToDateTime(value),
TSDataType.BLOB => Convert.ToByte(value),
TSDataType.STRING => Convert.ToString(value),
_ => Convert.ToString(value)
};
}
}

View File

@ -47,6 +47,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (_sessionPool == null)
{
return;
}
await _sessionPool.Close();
}
/// <summary>
/// 批量插入对齐时间序列数据
/// </summary>
@ -59,7 +72,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}");
}
//await CloseAsync();
return result;
}
@ -70,7 +83,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
return await _sessionPool.ExecuteQueryStatementAsync(sql);
var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
}
public void Dispose()

View File

@ -45,6 +45,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (_sessionPool == null)
{
return;
}
await _sessionPool.Close();
}
/// <summary>
/// 批量插入
/// </summary>
@ -58,6 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}");
}
//await CloseAsync();
return result;
}
@ -68,7 +82,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
return await _sessionPool.ExecuteQueryStatementAsync(sql);
var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
}
public void Dispose()

View File

@ -63,6 +63,7 @@ var host = Host.CreateDefaultBuilder(args)
services.AddSingleton<IAdminClientService, AdminClientService>();
services.AddSingleton<IProducerService, ProducerService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddSingleton<KafkaPollyPipeline>();
services.AddTransient<KafkaSubscribeTest>();
})

View File

@ -41,6 +41,9 @@ namespace JiShe.CollectBus.Kafka
// 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>();
// 注册Polly
context.Services.AddSingleton<KafkaPollyPipeline>();
//context.Services.AddHostedService<HostedService>();
}

View File

@ -2,32 +2,46 @@
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Text;
using Volo.Abp.Timing;
using System.Text.RegularExpressions;
using System.Threading;
namespace JiShe.CollectBus.Kafka.Consumer
{
public class ConsumerService : IConsumerService, IDisposable
{
private readonly ILogger<ConsumerService> _logger;
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
/// <summary>
/// 消费者存储
/// Key 格式:{groupId}_{topic}_{TKey}_{TValue}
/// </summary>
private readonly ConcurrentDictionary<string, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
/// <summary>
/// 消费完或者无数据时的延迟时间
/// </summary>
private TimeSpan DelayTime => TimeSpan.FromMilliseconds(100);
private readonly KafkaOptionConfig _kafkaOptionConfig;
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
/// <summary>
/// ConsumerService
/// </summary>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
_kafkaPollyPipeline = kafkaPollyPipeline;
}
#region private
@ -96,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
{
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
await SubscribeAsync<TValue>(new[] { topic }, messageHandler, groupId);
}
/// <summary>
@ -109,59 +123,75 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource();
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
//(
// CreateConsumer<TKey, TValue>(groupId),
// cts
//)).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<TKey, TValue>(groupId);
consumer!.Subscribe(topics);
await Task.Run(async () =>
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
continue;
if (result.IsPartitionEOF)
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
}
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
consumer.Commit(result); // 手动提交
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
}
}
}, cts.Token);
await Task.CompletedTask;
});
await Task.CompletedTask;
}
@ -176,20 +206,16 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
try {
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
//{
// string ssss = "";
//}
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
//(
// CreateConsumer<string, TValue>(groupId),
// cts
//)).Consumer as IConsumer<string, TValue>;
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
var consumer = CreateConsumer<Ignore, TValue>(groupId);
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
@ -204,14 +230,16 @@ namespace JiShe.CollectBus.Kafka.Consumer
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
await Task.Delay(500, cts.Token);
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(100, cts.Token);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
@ -220,7 +248,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
await Task.Delay(500, cts.Token);
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
@ -229,22 +256,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
//else
// consumer.StoreOffset(result);
}
catch (ConsumeException ex)
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
});
} catch (Exception ex)
{
_logger.LogWarning($"Kafka消费异常: {ex.Message}");
}
await Task.CompletedTask;
}, cts.Token);
await Task.CompletedTask;
});
}
@ -273,109 +304,114 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource();
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
// (
// CreateConsumer<TKey, TValue>(groupId),
// cts
// )).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<string, TValue>(groupId);
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
if (result != null)
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
if (result.IsPartitionEOF)
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(10, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
if (result.IsPartitionEOF)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
await Task.CompletedTask;
});
}
@ -406,111 +442,113 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
public async Task SubscribeBatchAsync<TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
var consumerKey = typeof(KafkaConsumer<string, TValue>);
var cts = new CancellationTokenSource();
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
// (
// CreateConsumer<string, TValue>(groupId),
// cts
// )).Consumer as IConsumer<string, TValue>;
var consumer= CreateConsumer<string, TValue> (groupId);
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
//var messages = new List<ConsumeResult<TKey, TValue>>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
if (result != null)
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
if (result.IsPartitionEOF)
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(10, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
if (result.IsPartitionEOF)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
await Task.CompletedTask;
});
}
@ -519,9 +557,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class
public void Unsubscribe<TKey, TValue>(string[] topics, string? groupId) where TKey : notnull where TValue : class
{
var consumerKey = typeof((TKey, TValue));
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
if (_consumerStore.TryRemove(consumerKey, out var entry))
{
entry.CTS.Cancel();

View File

@ -46,5 +46,5 @@ public interface IConsumerService
string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null)
where TValue : class;
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
void Unsubscribe<TKey, TValue>(string[] topics, string groupId) where TKey : notnull where TValue : class;
}

View File

@ -0,0 +1,111 @@
using Confluent.Kafka;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Polly.Contrib.WaitAndRetry;
using Volo.Abp.DependencyInjection;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Producer;
namespace JiShe.CollectBus.Kafka.Internal
{
public class KafkaPollyPipeline
{
private readonly ILogger<KafkaPollyPipeline> _logger;
public KafkaPollyPipeline(ILogger<KafkaPollyPipeline> logger)
{
_logger= logger;
}
/// <summary>
/// 判断是否可恢复的异常
/// </summary>
/// <param name="ex"></param>
/// <returns></returns>
public static bool IsRecoverableError(Exception ex)
{
var errorList= new List<ErrorCode>
{
ErrorCode.GroupLoadInProgress,
ErrorCode.Local_Retry,
ErrorCode.Local_MaxPollExceeded,
ErrorCode.RequestTimedOut,
ErrorCode.LeaderNotAvailable,
ErrorCode.NotLeaderForPartition,
ErrorCode.RebalanceInProgress,
ErrorCode.NotCoordinatorForGroup,
ErrorCode.NetworkException,
ErrorCode.GroupCoordinatorNotAvailable
};
return ex switch
{
ConsumeException kafkaEx => errorList.Contains(kafkaEx.Error.Code),
KafkaException kafkaEx =>kafkaEx.Error.IsFatal && errorList.Contains(kafkaEx.Error.Code),
_ => false
};
}
/// <summary>
/// 创建重试 + 断路器
/// </summary>
/// <returns></returns>
public ResiliencePipeline KafkaPipeline
{
get
{
// 组合重试 + 断路器
ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
ShouldHandle = args => args.Outcome.Exception switch
{
not null when IsRecoverableError(args.Outcome.Exception) =>
PredicateResult.True(),
_ => PredicateResult.False()
},
Delay = TimeSpan.FromSeconds(2),
OnRetry = args =>
{
_logger.LogWarning($"重试中... 第 {args.AttemptNumber} 次,原因: {args.Outcome.Exception?.Message}");
return default;
}
})
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
ShouldHandle = args => args.Outcome.Exception switch
{
not null when IsRecoverableError(args.Outcome.Exception) =>
PredicateResult.True(),
_ => PredicateResult.False()
},
FailureRatio = 0.8, // 80% 失败触发熔断
SamplingDuration = TimeSpan.FromSeconds(10),
MinimumThroughput = 4, // 至少4次调用才计算失败率
BreakDuration = TimeSpan.FromSeconds(10),
OnOpened = args =>
{
_logger.LogWarning($"熔断器开启,等待 {args.BreakDuration} 后重试");
return default;
},
OnClosed = _ =>
{
_logger.LogWarning("熔断器关闭,再次开始重试");
return default;
}
})
.Build();
return pipeline;
}
}
}
}

View File

@ -8,6 +8,8 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageReference Include="Polly.Core" Version="8.5.2" />
<PackageReference Include="Volo.Abp.AspNetCore" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
</ItemGroup>

View File

@ -9,11 +9,9 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts
{
public class AnalysisStrategyContext
public class AnalysisStrategyContext(IServiceProvider provider)
{
private readonly IServiceProvider _provider;
public AnalysisStrategyContext(IServiceProvider provider) => _provider = provider;
private readonly IServiceProvider _provider = provider;
/// <summary>
/// 执行策略

View File

@ -8,6 +8,6 @@ namespace JiShe.CollectBus.Protocol.Dto
{
public class AFN0_F1_AnalysisDto: UnitDataDto
{
public bool Verify { get; set; } = true;
}
}

View File

@ -12,6 +12,9 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H
{
/// <summary>
/// 全部确认:对收到报文中的全部数据单元标识进行确认
/// </summary>
public class AFN0_F1_Analysis: IAnalysisStrategy<TB3761, AFN0_F1_AnalysisDto>
{
private readonly ILogger<AFN0_F1_Analysis> _logger;

View File

@ -50,13 +50,13 @@ namespace JiShe.CollectBus.Protocol
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IAnalysisStrategy接口
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
if (analysisStrategyTypes.Count() == 0)
if (!analysisStrategyTypes.Any())
continue;
foreach (var analysisStrategyType in analysisStrategyTypes)
{
// 通过反射获取静态元数据
var strategyType = analysisStrategyType.Name;
var genericArgs = analysisStrategyType.GetInterface("IAnalysisStrategy`2")!.GetGenericArguments();
var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments();
var inputType = genericArgs[0];
var resultType = genericArgs[1];
// 注册策略实现

View File

@ -28,16 +28,18 @@ namespace JiShe.CollectBus.Protocol
private readonly IProducerService _producerService;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly ITcpService _tcpService;
/// <summary>
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class.
/// </summary>
/// <param name="serviceProvider">The service provider.</param>
public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger<StandardProtocolPlugin> logger) : base(serviceProvider, logger)
public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger<StandardProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger)
{
_logger= logger;
_logger = logger;
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
_producerService = serviceProvider.GetRequiredService<IProducerService>();
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
_tcpService = tcpService;
}
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
@ -146,9 +148,21 @@ namespace JiShe.CollectBus.Protocol
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
var issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedLoginEvent.ClientId,
DeviceNo = messageReceivedLoginEvent.DeviceNo,
Message = bytes, Type = IssuedEventType.Login,
MessageId = messageReceivedLoginEvent.MessageId
};
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
}
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedLoginEvent.ClientId, DeviceNo = messageReceivedLoginEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedLoginEvent.MessageId });
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
}
@ -226,11 +240,24 @@ namespace JiShe.CollectBus.Protocol
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedHeartbeatEvent.ClientId, DeviceNo = messageReceivedHeartbeatEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceivedHeartbeatEvent.MessageId });
IssuedEventMessage issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedHeartbeatEvent.ClientId,
DeviceNo = messageReceivedHeartbeatEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Heartbeat,
MessageId = messageReceivedHeartbeatEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
}
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
}

View File

@ -168,7 +168,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
DeviceType = "1",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, string>(measuring, value)

View File

@ -101,6 +101,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "",
//});
//ammeterInfos.Add(new AmmeterInfo()
//{
@ -115,6 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
// BrandType = "",
//});
//return ammeterInfos;
@ -127,10 +129,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
//if (!string.IsNullOrWhiteSpace(gatherCode))
//{
// sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
//}
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<AmmeterInfo>(sql);
@ -186,30 +188,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<WatermeterInfo>(sql);
}
/// <summary>
/// 测试设备分组均衡控制算法
/// </summary>
/// <param name="deviceCount"></param>
/// <returns></returns>
[HttpGet]
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
{
var deviceList = new List<string>();
for (int i = 0; i < deviceCount; i++)
{
deviceList.Add($"Device_{Guid.NewGuid()}");
}
// 初始化缓存
DeviceGroupBalanceControl.InitializeCache(deviceList);
// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
}
}
}

View File

@ -76,14 +76,11 @@ namespace JiShe.CollectBus.Subscribers
isAck=false;
break;
}
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
isAck = true;
isAck = true;
}
// TODO:暂时ACK等后续处理是否放到私信队列中
@ -102,19 +99,10 @@ namespace JiShe.CollectBus.Subscribers
isAck = false;
break;
}
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null)
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
if(_tcpService.ClientExists(issuedEventMessage.ClientId))
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
}
}
// TODO:暂时ACK等后续处理是否放到私信队列中
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
}
@ -176,19 +164,6 @@ namespace JiShe.CollectBus.Subscribers
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
{
//foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
//{
// var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
// if (protocolPlugin == null)
// {
// _logger.LogError("协议不存在!");
// }
// else
// {
// //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
// await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
// }
//}
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
return SubscribeAck.Success();
@ -197,20 +172,6 @@ namespace JiShe.CollectBus.Subscribers
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)]
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
{
//foreach (var receivedLoginMessage in receivedLoginMessages)
//{
//var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
//if (protocolPlugin == null)
//{
// _logger.LogError("协议不存在!");
//}
//else
//{
// //await protocolPlugin.LoginAsync(receivedLoginMessage);
// await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
//}
//}
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
return SubscribeAck.Success();
}

View File

@ -24,13 +24,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳、或者某一个固定的标识1
/// </summary>
[FIELDColumn]
public string ScoreValue
{
get
{
return $"{DeviceId}.{TaskMark}".Md5Fun();
}
}
public string ScoreValue { get; set; }
/// <summary>
/// 是否手动操作

View File

@ -18,7 +18,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary>
[Column(IsIgnore = true)]
public override string MemberId => $"{FocusId}:{MeterId}";
public override string MemberId => $"{FocusAddress}:{MeteringCode}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
@ -90,6 +90,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
//// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5燃气表流量计=6,特殊电表=7
/// </summary>
public MeterTypeEnum MeterType { get; set; }
/// <summary>
/// 设备品牌;
/// (当 MeterType = 水表, 如 威铭、捷先 等)
@ -138,12 +139,17 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 采集器编号
/// </summary>
public string GatherCode { get; set; }
/// <summary>
/// 项目ID
/// </summary>
public int ProjectID { get; set; }
/// <summary>
/// 数据库业务ID
/// </summary>
public int DatabaseBusiID { get; set; }
/// <summary>
/// 是否异常集中器 0:正常1异常
/// </summary>

View File

@ -0,0 +1,593 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Enums
{
/// <summary>
/// 水表\流量计\特殊电表品牌
/// </summary>
public enum BrandTypeEnum
{
/// <summary>
/// 默认OC_129
/// </summary>
//None = 0,
/// <summary>
/// 冻结数据0D_101
/// </summary>
//Freeze = 1,
/// <summary>
/// 默认OC_129or 冻结数据0D_101
/// </summary>
NoneOrFreeze = 0,
/// <summary>
/// 188-良禾
/// </summary>
LiangHe188 = 1,
/// <summary>
/// 188-威铭
/// </summary>
WeiMing188 = 2,
/// <summary>
/// 188-宁波
/// </summary>
NingBo188 = 3,
/// <summary>
/// 西恩电磁流量计
/// </summary>
XEDC = 4,
/// <summary>
/// 西恩超声波流量计
/// </summary>
XECSB = 5,
/// <summary>
/// 电磁流量计(LDG-DN200)
/// </summary>
DCLDGDN200 = 6,
/// <summary>
/// 燃气表抄读
/// </summary>
Gasmeter = 7,
/// <summary>
/// 涡街流量计
/// </summary>
WJFlowmeter = 8,
/// <summary>
/// 流量计
/// </summary>
Flowmeter = 9,
/// <summary>
/// 涡街流量计(LUGBDN100)
/// </summary>
WJFlowmeterLUGBDN100 = 10,
/// <summary>
/// 涡街流量计(ZC-LUGB-232ZDNNY)
/// </summary>
WJFlowmeterZCLUGB232ZDNNY = 11,
/// <summary>
/// SB2100蒸汽表
/// </summary>
ZQBSB2100Flowmeter = 12,
/// <summary>
/// (HD)热式流量计
/// </summary>
RSHDFlowmeter = 13,
/// <summary>
/// (HDWMNDN300)热式流量计
/// </summary>
RSHDWMNDN300 = 14,
/// <summary>
/// 热式流量计(FLRS110-C100)
/// </summary>
RSFLRS110C100 = 15,
/// <summary>
/// 通用188协议
/// </summary>
Universal188 = 16,
#region
DTZ719 = 17,
AKKJMC800PY = 18,
HRKJ001 = 19,
THYB9D1 = 20,
DTSD342 = 21,
/// <summary>
/// 谐波
/// </summary>
AFN16_F109 = 22,
/// <summary>
/// DL/T 645—2007 规约时采用该类
/// </summary>
CustomItemCode_93 = 23,
/// <summary>
/// 电表组合有功示值透抄CustomItemCode_95
/// </summary>
AFN16_F95 = 24,
#endregion
/// <summary>
/// 非特殊表
/// </summary>
None = 25,
/// <summary>
/// SDM630MCT 导轨电表
/// </summary>
SDM630MCT = 26,
/// <summary>
/// 水表通过0C_129采集
/// </summary>
Watermeter0C_129 = 27,
/// <summary>
/// YYD电磁流量计
/// </summary>
YYDFlowmeter = 28,
/// <summary>
/// 透明转发 跳合闸(水表)
/// </summary>
AFN16_F99 = 29,
/// <summary>
/// 透明转发 跳合闸(气表)
/// </summary>
AFN16_F100 = 30,
/// <summary>
/// 温度压力补偿涡街流量计(TPC1001)涡街流量计
/// </summary>
WJTPC1001 = 31,
/// <summary>
/// (LDG-SP25)树普电磁流量计
/// </summary>
ShupuLDGSP25Flowmeter = 32,
/// <summary>
/// 西恩涡街流量计(LUGBC-100)
/// </summary>
XEWJLUGBC100 = 33,
/// <summary>
/// 智能涡街流量计(UG-1132A)
/// </summary>
WJUG1132A = 34,
/// <summary>
/// 水表通过0D_101采集
/// </summary>
Watermeter0D_101 = 35,
/// <summary>
/// 读取SIM卡号
/// </summary>
AFN16_F101 = 36,
/// <summary>
/// 恒瑞科技三相导轨式电能表 or 恒瑞科技嵌入式电表测试
/// </summary>
DTS600 = 37,
/// <summary>
/// 恒瑞科技单相导轨式电能表DDS600
/// </summary>
DDS600 = 38,
/// <summary>
/// 旋进漩涡流量计(LUXB) 天津凯隆仪表科技有限公司
/// </summary>
XJXWLUXB = 39,
/// <summary>
/// DDSD720-L科陆单相导轨表
/// </summary>
DDSD720L = 40,
/// <summary>
/// 东久电磁流量计DJLD
/// </summary>
DJLD = 41,
/// <summary>
/// DTSD720-L科陆三相导轨表
/// </summary>
DTSD720L = 42,
/// <summary>
/// 世成(社为表计)涡街流量计
/// </summary>
SCLUGB = 43,
/// <summary>
/// CL7339MN-ZY科陆三相表
/// </summary>
CL7339MNZY = 44,
/// <summary>
/// 江森智能SNY723MC数显表
/// </summary>
SNY723MC = 45,
/// <summary>
/// 珠海派诺科技PMAC770三相数显表
/// </summary>
PMAC770 = 46,
/// <summary>
/// 北京中科涡街流量计ZKTD-LUCBY
/// </summary>
ZKTD_LUGBY = 47,
/// <summary>
/// 夏仪股份蒸汽流量计(LUGB-DN
/// </summary>
LUGB_DN = 48,
/// <summary>
/// LWQ-D2型气体涡轮流量计
/// </summary>
LWQ_D2 = 49,
/// <summary>
/// 西恩涡街流量计分体式流量积算仪32FC系列
/// </summary>
XEJSY32FC = 50,
/// <summary>
/// 寺崎科技PD652E-9S4电表
/// </summary>
PD652E9S4 = 51,
/// <summary>
/// 液体涡轮流量计(LWGY)
/// </summary>
LWGY = 52,
/// <summary>
/// 多功能积算仪(RW-A)
/// </summary>
DGNRWA = 53,
/// <summary>
/// 杭梅电气DTS804导轨表
/// </summary>
DTS804 = 54,
/// <summary>
/// 杭梅电气HG194-D93数显表
/// </summary>
HG194D93 = 55,
/// <summary>
/// 连水超声波水表188
/// </summary>
Lianshui188 = 56,
/// <summary>
/// 湖北回盛生物科技有限公司EZT96Y数显表
/// </summary>
EZT96Y,
/// <summary>
/// 上海肖月智能流量积算仪
/// </summary>
ZNLLJ,
/// <summary>
/// 西安诚通电磁流量计
/// </summary>
CTLDE250SC31GM8FB,
/// <summary>
/// 雅达YD2040
/// </summary>
YD2040,
/// <summary>
/// EVC智能体积修正仪
/// </summary>
EVC,
/// <summary>
/// 气体超声流量计IGSM-TS
/// </summary>
IGSMTS,
/// <summary>
/// YX-9SYE三相多功能表
/// </summary>
YX9SYE,
/// <summary>
/// 世成液体涡轮流量计(SCLWGY-DN50)
/// </summary>
SCLWGYDN50,
/// <summary>
/// 杭州盘古积算仪(FX6000F)
/// </summary>
FX6000F,
/// <summary>
/// "盘古电磁流量计(PMF-GM4.0A1-50M11K1F1T0C3)
/// </summary>
PFMGM40A150M11K1F1T0C3,
/// <summary>
/// 西恩液体涡轮流量计(SEAN LWGY-50)
/// </summary>
SeanLWGY50,
/// <summary>
/// 雷泰电磁流量计LD-LDE-DN50
/// </summary>
LDLDEDN50,
/// <summary>
/// 雷泰涡街流量计(LT-LUGB-DN50)
/// </summary>
LTLUGBDN50,
/// <summary>
/// 珠海派诺科技股份有限公司SPM33电力仪表
/// </summary>
SPM33,
/// <summary>
/// 株洲斯瑞克电气有限公司三相数显多功能电力仪表PD369E-AS4
/// </summary>
PD369EAS4,
/// <summary>
/// 湖北回盛生物科技有限公司-涡街流量计(10VTEAD03A200C1A2HOAG)
/// </summary>
WJ10VTEAD03A200C1A2HOAG,
/// <summary>
/// 世成旋进旋涡流量计SCLUX-DN25
/// </summary>
SCLUXDN25,
/// <summary>
/// 世成气体涡轮流量计(SCLWGQ-DN50)
/// </summary>
SCLWGQDN50,
/// <summary>
/// 智能电磁流量计(MDL210)
/// </summary>
MDL210,
/// <summary>
/// 江苏华海涡街流量计Focvor4202
/// </summary>
Focvor4202,
/// <summary>
/// 华凯电力HK194E-9S4
/// </summary>
HK194E9S4,
/// <summary>
/// 威胜测试-DTSD342_9N
/// </summary>
DTSD342Test,
/// <summary>
///科迈捷涡街流量计VFM-60
/// </summary>
VFM60,
/// <summary>
///江苏华海涡街流量计积算仪
/// </summary>
HHJSY,
/// <summary>
///宏江4G水表
/// </summary>
HJDN15,
/// <summary>
///世成4G涡街流量计
/// </summary>
LPV2,
/// <summary>
///浙江正泰DTSU666
/// </summary>
DTSU666,
/// <summary>
/// 浙江启唯电气-数码三相多功能电表QV194E-9S4
/// </summary>
QV194E9S4,
/// <summary>
/// 施耐德PM2100
/// </summary>
PM2100,
/// <summary>
/// 天康电磁流量计
/// </summary>
TK1100FT,
/// <summary>
/// 西恩气体涡轮流量计(SEANLWQ)
/// </summary>
SEANLWQ,
/// <summary>
/// V880BR涡街流量计
/// </summary>
V880BR,
/// <summary>
/// 大导SDD194E-9
/// </summary>
SDD194E_9,
/// <summary>
///泉高阀门科技有限公司-超声波水表
/// </summary>
QGFMCSB,
#region
SensorMeter,
#endregion
/// <summary>
/// 分体式超声波明渠流量计SC-6000F
/// </summary>
SC6000F,
/// <summary>
/// 江苏京仪JLWQ型气体流量计JLWQ型气体流量计
/// </summary>
JLWQ,
/// <summary>
/// 广州智光SMC200
/// </summary>
SMC200,
/// <summary>
/// 左拓ZFM2-621
/// </summary>
ZFM2621,
/// <summary>
/// 江苏华尔威涡街旋进流量计
/// </summary>
HRW520,
/// <summary>
/// 施耐德PM5350P
/// </summary>
PM5350P,
/// <summary>
/// 施耐德PM810MG
/// </summary>
PM810MG,
/// <summary>
/// 浙江之高ZL96-3E
/// </summary>
ZL96_3E,
/// <summary>
/// 拓普电子PD284Z-9S4
/// </summary>
PD284Z_9S4,
/// <summary>
/// 上海普川DTSU5886
/// </summary>
DTSU5886,
/// <summary>
/// 安德利SC194E-9S4
/// </summary>
SC194E9S4,
/// <summary>
/// 浙江天电电气TD700E-AS3
/// </summary>
TD700EAS3,
/// <summary>
/// 世成分体式涡街流量计SW-SCLUGB-DN
/// </summary>
SWSCLUGBDN,
/// <summary>
/// 东久电磁冷热量计SW-DJLD
/// </summary>
SWDJLD,
/// <summary>
/// 北京中科拓达ZKTD-LUGB
/// </summary>
ZKTD_LUGB,
/// <summary>
/// 江苏英美迪自动化有限公司三相液晶多功能仪表YMD96A-E4
/// </summary>
YMD96A_E4,
/// <summary>
/// 金湖盛元LWQ气体涡轮流量计
/// </summary>
JHSYLWQ,
/// <summary>
/// 天康涡街流量计TK2000
/// </summary>
TK2000,
/// <summary>
/// 浙江迈拓三相导轨电表DTSF1709
/// </summary>
DTSF1709,
/// <summary>
/// 杭州逸控科技超声波流量计ECUL30B-L2C1NSVC
/// </summary>
ECUL30BL2C1NSVC,
/// <summary>
/// 数字电测表HTD288-DM44/R
/// </summary>
HTD288,
/// <summary>
/// 杭州逸控科技有限公司ECLUGB2305W3C2N
/// </summary>
ECLUGB2305W3C2N,
/// <summary>
/// 江苏华海测控科技有限公司温压补偿流量积算仪
/// </summary>
XMJA9000,
/// <summary>
/// 湖南佳一机电设备有限公司精致型蒸汽热能积算仪
/// </summary>
F3200H,
/// <summary>
/// 合兴加能电梯能量回馈装置
/// </summary>
IPCPFE04MNDC,
/// <summary>
/// 宁波创盛旋涡流量计
/// </summary>
CX25,
/// <summary>
/// 群力电气QDCZY1N
/// </summary>
QDCZY1N,
/// <summary>
///深圳中电PMCS963C
/// </summary>
PMCS963C,
/// <summary>
/// 迅尔燃气表D2SD2ET3D4S
/// </summary>
D2SD2ET3D4S,
/// <summary>
/// INTELLIGENT积算仪F2000X
/// </summary>
F2000X,
/// <summary>
///多盟-DM194Z-9SY
/// </summary>
DM194Z9SY,
/// <summary>
/// 纳宇PD760
/// </summary>
PD760,
/// <summary>
/// 纳宇DTS90031LPD
/// </summary>
DTS90031LPD,
/// <summary>
/// 上海施易克SEIK680数显表
/// </summary>
SEIK680,
/// <summary>
/// 中灵电气ZL125SC
/// </summary>
ZL125SC,
/// <summary>
/// 江苏京仪气体涡轮流量计JLWQE
/// </summary>
JLWQE,
/// <summary>
/// HART智能转换器
/// </summary>
SM100,
/// <summary>
/// 拓思特涡街流量计H880BR
/// </summary>
H880BR,
/// <summary>
/// DDSD720-L-单相电子式导轨表
/// </summary>
DDSD720L2,
/// <summary>
/// 浙江智电三相三线大功率有功电能表
/// </summary>
ZJZDSXSX,
/// <summary>
/// 山水翔水表LXSY
/// </summary>
LXSY,
/// <summary>
/// 衡水多元仪表有限公司气体涡轮流量计DYWQ
/// </summary>
DYWQ,
/// <summary>
/// 安徽聚积电子
/// </summary>
DDS2052,
/// <summary>
/// 湖南中麦
/// </summary>
ZMDTSD3429N,
/// <summary>
///DTS2377三相导轨式多功能智能电表
/// </summary>
DTS2377
}
}

View File

@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Enums
{
/// <summary>
/// 表计连接通讯协议--表计与集中器的通讯协议
/// </summary>
public enum MeterLinkProtocol
{
/// <summary>
/// 无
/// </summary>
None = 0,
/// <summary>
/// DL/T 645—1997
/// </summary>
DLT_645_1997 = 1,
/// <summary>
/// 交流采样装置通信协议(电表)
/// </summary>
ACSamplingDevice = 2,
/// <summary>
/// DL/T 645—2007
/// </summary>
DLT_645_2007 = 30,
/// <summary>
/// 载波通信
/// </summary>
Carrierwave = 31,
/// <summary>
/// CJ/T 188—2018协议(水表)
/// </summary>
CJT_188_2018 = 32,
/// <summary>
/// CJ/T 188—2004协议
/// </summary>
CJT_188_2004 = 33,
/// <summary>
/// MODBUS-RTU
/// </summary>
MODBUS_RTU = 34,
}
}

View File

@ -88,7 +88,7 @@
"ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 2,
"DataBaseName": "energy",
"OpenDebugMode": false,
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
"Cassandra": {