From 5647385582ef155ec77841a3fc741b361fffe3e7 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Tue, 22 Apr 2025 17:58:14 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Options/IoTDBOptions.cs | 5 +++++ .../Provider/IoTDBProvider.cs | 3 ++- .../Provider/SessionPoolAdapter.cs | 2 +- .../Provider/TableSessionPoolAdapter.cs | 2 +- .../BasicScheduledMeterReadingService.cs | 22 ++++++++++++------- .../Pages/Monitor.cshtml | 2 +- web/JiShe.CollectBus.Host/appsettings.json | 2 +- 7 files changed, 25 insertions(+), 13 deletions(-) diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs index 0d01f81..251e48b 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs @@ -47,5 +47,10 @@ /// 时区,默认为:"UTC+08:00" /// public string ZoneId { get; set; } = "UTC+08:00"; + + /// + /// 请求超时时间,单位毫秒,默认为:50000 + /// + public long Timeout { get; set; } = 50000; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 7cfaf32..4615053 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -214,6 +214,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } catch (Exception ex) { + CurrentSession.Dispose(); _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常"); throw; } @@ -414,7 +415,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity { var metadata = await GetMetadata(); - var sb = new StringBuilder("SELECT "); + var sb = new StringBuilder("SELECT TIME,"); sb.AppendJoin(", ", metadata.ColumnNames); sb.Append($" FROM {options.TableNameOrTreePath}"); diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index eacf246..c8c36ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -70,7 +70,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql); + return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout); } public void Dispose() diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index 137f5a8..a6e0f2a 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -68,7 +68,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql); + return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout); } public void Dispose() diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 3437f4b..40dfeba 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -65,6 +65,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; + + _runtimeContext.UseTableSessionPool = true; } /// @@ -138,15 +140,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var meterTypes = EnumExtensions.ToEnumDictionary(); + + var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候,NextTaskTime已经格式化到下一个采集点时间。 if (meteryType == MeterTypeEnum.Ammeter.ToString()) { //List pushTaskInfos = new(); - _runtimeContext.UseTableSessionPool = true; + //_runtimeContext.UseTableSessionPool = true; var metadata = await _dbProvider.GetMetadata(); _ = CreateMeterPublishTask( timeDensity: timeDensity, - nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), + nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.Ammeter, taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { @@ -166,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = CreateMeterPublishTask( timeDensity: timeDensity, - nextTaskTime: tasksToBeIssueModel.NextTaskTime, + nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.Ammeter, taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => { @@ -183,8 +187,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 - tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime; - tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); + tasksToBeIssueModel.LastTaskTime = currentTaskTime; + tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } } @@ -248,8 +252,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - //return; + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -460,7 +464,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 15; //var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity); - var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity); var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity); var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); @@ -1031,6 +1034,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading int pageNumber = 0; bool hasNext; var stopwatch = Stopwatch.StartNew(); + + var ddd = _runtimeContext.UseTableSessionPool; + do { options.PageIndex = pageNumber++; diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 88209c2..b25f9f0 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +
[FIELDColumn] - public string ScoreValue - { - get - { - return $"{DeviceId}.{TaskMark}".Md5Fun(); - } - } + public string ScoreValue { get; set; } /// /// 是否手动操作 From 60aad0032bdd7237782316dbd2db477a63a91e4a Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Tue, 22 Apr 2025 23:44:37 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E8=A7=A3=E5=86=B3IoTDB=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E8=B0=83=E7=94=A8=E6=9F=A5=E8=AF=A2=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Interface/IIoTDBSessionPool.cs | 6 ++++ .../Provider/IoTDBProvider.cs | 34 ++++++++++++++----- .../Provider/SessionPoolAdapter.cs | 20 +++++++++-- .../Provider/TableSessionPoolAdapter.cs | 19 ++++++++++- 4 files changed, 67 insertions(+), 12 deletions(-) diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs index 9587549..6bcbc5c 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs @@ -13,6 +13,12 @@ namespace JiShe.CollectBus.IoTDB.Interface /// Task OpenAsync(); + /// + /// 关闭连接池 + /// + /// + Task CloseAsync(); + /// /// 插入数据 /// diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 15d2df1..9c7c602 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -25,7 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// IoTDB数据源 /// - public class IoTDbProvider : IIoTDbProvider, IScopedDependency + public class IoTDbProvider : IIoTDbProvider, ITransientDependency { private static readonly ConcurrentDictionary MetadataCache = new(); private readonly ILogger _logger; @@ -496,7 +496,16 @@ namespace JiShe.CollectBus.IoTDB.Provider } var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); - return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; + if (result.HasNext()) + { + await result.Close(); + return 0; + } + + var count = Convert.ToInt32(result.Next().Values[0]); + await result.Close(); + + return count; } /// @@ -512,8 +521,13 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var properties = typeof(T).GetProperties(); - metadata.ColumnNames.Insert(0, "Timestamps"); - metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP); + + var columns = new List() { "Timestamps" }; + var dataTypes = new List() { TSDataType.TIMESTAMP }; + columns.AddRange(metadata.ColumnNames); + dataTypes.AddRange(metadata.DataTypes); + //metadata.ColumnNames.Insert(0, "Timestamps"); + //metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP); while (dataSet.HasNext() && results.Count < pageSize) { @@ -523,17 +537,17 @@ namespace JiShe.CollectBus.IoTDB.Provider Timestamps = record.Timestamps }; - foreach (var measurement in metadata.ColumnNames) + foreach (var measurement in columns) { - int indexOf = metadata.ColumnNames.IndexOf(measurement); + int indexOf = columns.IndexOf(measurement); var value = record.Values[indexOf]; - TSDataType tSDataType = metadata.DataTypes[indexOf]; + TSDataType tSDataType = dataTypes[indexOf]; var prop = properties.FirstOrDefault(p => p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); if (prop != null && !(value is System.DBNull)) - { - dynamic tempValue = GetTSDataValue(tSDataType,value); + { + dynamic tempValue = GetTSDataValue(tSDataType, value); if (measurement.ToLower().EndsWith("time")) { @@ -548,7 +562,9 @@ namespace JiShe.CollectBus.IoTDB.Provider } results.Add(entity); + } + await dataSet.Close(); return results; } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index c8c36ee..ee71cf1 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -47,6 +47,19 @@ namespace JiShe.CollectBus.IoTDB.Provider } } + /// + /// 关闭连接池 + /// + /// + public async Task CloseAsync() + { + if (_sessionPool == null) + { + return; + } + await _sessionPool.Close(); + } + /// /// 批量插入对齐时间序列数据 /// @@ -59,7 +72,7 @@ namespace JiShe.CollectBus.IoTDB.Provider { throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功,返回结果为:{result}"); } - + //await CloseAsync(); return result; } @@ -70,7 +83,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout); + var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout); + //await result.Close(); + //await CloseAsync(); + return result; } public void Dispose() diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index a6e0f2a..dc4f0ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -45,6 +45,19 @@ namespace JiShe.CollectBus.IoTDB.Provider } } + /// + /// 关闭连接池 + /// + /// + public async Task CloseAsync() + { + if (_sessionPool == null) + { + return; + } + await _sessionPool.Close(); + } + /// /// 批量插入 /// @@ -58,6 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功,返回结果为:{result}"); } + //await CloseAsync(); return result; } @@ -68,7 +82,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout); + var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout); + //await result.Close(); + //await CloseAsync(); + return result; } public void Dispose() From c5364f4a9564ab78c4ea0ab97f168b08ee8a5368 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Tue, 22 Apr 2025 23:45:08 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E8=80=85=E5=AE=9E=E4=BE=8B=E5=A4=8D=E7=94=A8=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E7=99=BB=E5=BD=95=E5=BF=83=E8=B7=B3=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Consumer/ConsumerService.cs | 69 +++++++++---------- .../Consumer/IConsumerService.cs | 2 +- .../AnalysisStrategyContext.cs | 6 +- .../Protocol/Dto/AFN0_F1_AnalysisDto.cs | 2 +- .../AnalysisData/AFN_00H/AFN0_F1_Analysis.cs | 3 + .../StandardProtocolPlugin.cs | 39 +++++++++-- .../Samples/SampleAppService.cs | 2 +- .../Subscribers/SubscriberAppService.cs | 45 +----------- .../Pages/Monitor.cshtml | 1 - 9 files changed, 77 insertions(+), 92 deletions(-) diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 5313833..32df748 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -7,16 +7,20 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Text; +using System.Text.RegularExpressions; namespace JiShe.CollectBus.Kafka.Consumer { public class ConsumerService : IConsumerService, IDisposable { private readonly ILogger _logger; - private readonly ConcurrentDictionary + /// + /// 消费者存储 + /// Key 格式:{groupId}_{topic}_{TKey}_{TValue} + /// + private readonly ConcurrentDictionary _consumerStore = new(); private readonly KafkaOptionConfig _kafkaOptionConfig; - private class KafkaConsumer where TKey : notnull where TValue : class { } /// /// ConsumerService @@ -108,15 +112,14 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - //( - // CreateConsumer(groupId), - // cts - //)).Consumer as IConsumer; - var consumer = CreateConsumer(groupId); + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; consumer!.Subscribe(topics); await Task.Run(async () => @@ -176,19 +179,14 @@ namespace JiShe.CollectBus.Kafka.Consumer public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { try { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName)) - //{ - // string ssss = ""; - //} - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - //( - // CreateConsumer(groupId), - // cts - //)).Consumer as IConsumer; + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; - var consumer = CreateConsumer(groupId); consumer!.Subscribe(topics); _ = Task.Run(async () => @@ -274,15 +272,14 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次超时时间 public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - // ( - // CreateConsumer(groupId), - // cts - // )).Consumer as IConsumer; - var consumer = CreateConsumer(groupId); + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -366,6 +363,7 @@ namespace JiShe.CollectBus.Kafka.Consumer catch (OperationCanceledException) { // 任务取消,正常退出 + } catch (Exception ex) { @@ -407,16 +405,15 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费等待时间 public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - // ( - // CreateConsumer(groupId), - // cts - // )).Consumer as IConsumer; + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; - var consumer= CreateConsumer (groupId); consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -518,9 +515,9 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// /// - public void Unsubscribe() where TKey : notnull where TValue : class + public void Unsubscribe(string[] topics, string groupId) where TKey : notnull where TValue : class { - var consumerKey = typeof((TKey, TValue)); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; if (_consumerStore.TryRemove(consumerKey, out var entry)) { entry.CTS.Cancel(); diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs index 32ade01..b4b4274 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs @@ -46,5 +46,5 @@ public interface IConsumerService string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; - void Unsubscribe() where TKey : notnull where TValue : class; + void Unsubscribe(string[] topics, string groupId) where TKey : notnull where TValue : class; } \ No newline at end of file diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs index 8fe8f70..628934a 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs @@ -9,11 +9,9 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Protocol.Contracts { - public class AnalysisStrategyContext + public class AnalysisStrategyContext(IServiceProvider provider) { - private readonly IServiceProvider _provider; - - public AnalysisStrategyContext(IServiceProvider provider) => _provider = provider; + private readonly IServiceProvider _provider = provider; /// /// 执行策略 diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs index d6ab85a..ede07c1 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs @@ -8,6 +8,6 @@ namespace JiShe.CollectBus.Protocol.Dto { public class AFN0_F1_AnalysisDto: UnitDataDto { - + public bool Verify { get; set; } = true; } } diff --git a/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs index f5ba2af..3ff0882 100644 --- a/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs +++ b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs @@ -12,6 +12,9 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H { + /// + /// 全部确认:对收到报文中的全部数据单元标识进行确认 + /// public class AFN0_F1_Analysis: IAnalysisStrategy { private readonly ILogger _logger; diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 6a5e3a5..27f7466 100644 --- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -28,16 +28,18 @@ namespace JiShe.CollectBus.Protocol private readonly IProducerService _producerService; private readonly IRepository _deviceRepository; + private readonly ITcpService _tcpService; /// /// Initializes a new instance of the class. /// /// The service provider. - public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger logger) : base(serviceProvider, logger) + public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger logger, ITcpService tcpService) : base(serviceProvider, logger) { - _logger= logger; + _logger = logger; //_logger = serviceProvider.GetRequiredService>(); _producerService = serviceProvider.GetRequiredService(); _deviceRepository = serviceProvider.GetRequiredService>(); + _tcpService = tcpService; } public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); @@ -146,9 +148,21 @@ namespace JiShe.CollectBus.Protocol Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); + var issuedEventMessage = new IssuedEventMessage + { + ClientId = messageReceivedLoginEvent.ClientId, + DeviceNo = messageReceivedLoginEvent.DeviceNo, + Message = bytes, Type = IssuedEventType.Login, + MessageId = messageReceivedLoginEvent.MessageId + }; //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + if (_tcpService.ClientExists(issuedEventMessage.ClientId)) + { + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}"); + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage); + } - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedLoginEvent.ClientId, DeviceNo = messageReceivedLoginEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedLoginEvent.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); } @@ -226,11 +240,24 @@ namespace JiShe.CollectBus.Protocol }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); - - await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedHeartbeatEvent.ClientId, DeviceNo = messageReceivedHeartbeatEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceivedHeartbeatEvent.MessageId }); + + IssuedEventMessage issuedEventMessage = new IssuedEventMessage + { + ClientId = messageReceivedHeartbeatEvent.ClientId, + DeviceNo = messageReceivedHeartbeatEvent.DeviceNo, + Message = bytes, + Type = IssuedEventType.Heartbeat, + MessageId = messageReceivedHeartbeatEvent.MessageId + }; + if (_tcpService.ClientExists(issuedEventMessage.ClientId)) + { + await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes); + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}"); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage); + } //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); - + } diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 86582af..d871ecd 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -168,7 +168,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS { SystemName = "energy", DeviceId = "402440506", - DeviceType = "Ammeter", + DeviceType = "1", ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index cebb2e7..32a9fc3 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -75,14 +75,11 @@ namespace JiShe.CollectBus.Subscribers isAck=false; break; } - _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); - + loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - if (_tcpService.ClientExists(issuedEventMessage.ClientId)) - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); - isAck = true; + isAck = true; } // TODO:暂时ACK,等后续处理是否放到私信队列中 @@ -102,19 +99,10 @@ namespace JiShe.CollectBus.Subscribers isAck = false; break; } - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); - heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - //if (device != null) - //{ - // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); - //} - if(_tcpService.ClientExists(issuedEventMessage.ClientId)) - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); - } + } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } @@ -178,19 +166,6 @@ namespace JiShe.CollectBus.Subscribers //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { - //foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) - //{ - // var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - // if (protocolPlugin == null) - // { - // _logger.LogError("协议不存在!"); - // } - // else - // { - // //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); - // await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); - // } - //} await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); return SubscribeAck.Success(); @@ -200,20 +175,6 @@ namespace JiShe.CollectBus.Subscribers //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(List receivedLoginMessages) { - //foreach (var receivedLoginMessage in receivedLoginMessages) - //{ - //var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - //if (protocolPlugin == null) - //{ - // _logger.LogError("协议不存在!"); - //} - //else - //{ - // //await protocolPlugin.LoginAsync(receivedLoginMessage); - // await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); - //} - - //} await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages); return SubscribeAck.Success(); } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 0193df3..88209c2 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,7 +16,6 @@ 后端服务 - From d91fc5b35f1d84fee95be1c4b18dd1341b95f8be Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 23 Apr 2025 09:42:09 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=B0=B4=E8=A1=A8?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BasicScheduledMeterReadingService.cs | 732 +++++++++--------- ...nergySystemScheduledMeterReadingService.cs | 36 +- .../IotSystems/Watermeter/WatermeterInfo.cs | 10 +- .../Enums/BrandTypeEnum.cs | 593 ++++++++++++++ .../Enums/MeterLinkProtocol.cs | 54 ++ 5 files changed, 1025 insertions(+), 400 deletions(-) create mode 100644 shared/JiShe.CollectBus.Common/Enums/BrandTypeEnum.cs create mode 100644 shared/JiShe.CollectBus.Common/Enums/MeterLinkProtocol.cs diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index f1c69ee..92ae2ee 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ -using Confluent.Kafka; +using Amazon.Runtime.Internal.Endpoints.StandardLibrary; +using Confluent.Kafka; using DnsClient.Protocol; using FreeSql; using JiShe.CollectBus.Ammeters; @@ -11,6 +12,7 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.EnergySystems.Entities; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; @@ -33,10 +35,13 @@ using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.Metrics; using System.Linq; using System.Threading; using System.Threading.Tasks; using static FreeSql.Internal.GlobalFilter; +using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata; +using static Thrift.Protocol.Utilities.TJSONProtocolConstants; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -114,7 +119,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var currentTime = DateTime.Now; - foreach (var item in taskInfos) { var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item); @@ -124,7 +128,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 + //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>ServerTagName,tempArryay[3]=>TaskInfo,tempArryay[4]=>表计类别,tempArryay[5]=>采集频率 var tempArryay = item.Split(":"); string meteryType = tempArryay[4];//表计类别 int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率 @@ -141,14 +145,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var meterTypes = EnumExtensions.ToEnumDictionary(); - + var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候,NextTaskTime已经格式化到下一个采集点时间。 + var metadata = await _dbProvider.GetMetadata(); if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - //List pushTaskInfos = new(); - //_runtimeContext.UseTableSessionPool = true; - var metadata = await _dbProvider.GetMetadata(); _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTaskTime, @@ -158,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { - _logger.LogWarning($"{data.Name} 任务数据构建失败:{data.Serialize()}"); + _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } _dbProvider.BatchInsertAsync(metadata, tempTask); @@ -166,16 +168,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { - //todo 水表任务创建待处理 - //await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos); _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.Ammeter, - taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => + taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { - //AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); + + if (tempTask == null || tempTask.Count <= 0) + { + _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}"); + return; + } + _dbProvider.BatchInsertAsync(metadata, tempTask); }); } else @@ -213,52 +220,53 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { -#if DEBUG - var timeDensity = "15"; - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + //此处代码不要删除 + //#if DEBUG + // var timeDensity = "15"; + // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - List meterInfos = new List(); - List focusAddressDataLista = new List(); - var timer1 = Stopwatch.StartNew(); + // List meterInfos = new List(); + // List focusAddressDataLista = new List(); + // var timer1 = Stopwatch.StartNew(); - var allIds = new HashSet(); - decimal? score = null; - string member = null; + // var allIds = new HashSet(); + // decimal? score = null; + // string member = null; - while (true) - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: score, - lastMember: member); + // while (true) + // { + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: score, + // lastMember: member); - meterInfos.AddRange(page.Items); - focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress)); - foreach (var item in page.Items) - { - if (!allIds.Add(item.MemberId)) - { - _logger.LogError($"{item.MemberId}Duplicate data found!"); - } - } - if (!page.HasNext) break; - score = page.NextScore; - member = page.NextMember; - } + // meterInfos.AddRange(page.Items); + // focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress)); + // foreach (var item in page.Items) + // { + // if (!allIds.Add(item.MemberId)) + // { + // _logger.LogError($"{item.MemberId}Duplicate data found!"); + // } + // } + // if (!page.HasNext) break; + // score = page.NextScore; + // member = page.NextMember; + // } - timer1.Stop(); - _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); - DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - return; -#else + // timer1.Stop(); + // _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); + // DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + // return; + //#else + // var meterInfos = await GetAmmeterInfoList(gatherCode); + //#endif var meterInfos = await GetAmmeterInfoList(gatherCode); -#endif - if (meterInfos == null || meterInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); @@ -315,17 +323,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading focusAddressDataList.Add(item.Key); - // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; - -#if DEBUG - //每次缓存时,删除缓存,避免缓存数据有不准确的问题 - //await FreeRedisProvider.Instance.DelAsync(redisCacheKey); -#else - //每次缓存时,删除缓存,避免缓存数据有不准确的问题 - //await FreeRedisProvider.Instance.DelAsync(redisCacheKey); -#endif - - //Dictionary keyValuePairs = new Dictionary(); foreach (var ammeter in item) { //处理ItemCode @@ -402,71 +399,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task AmmeterScheduledMeterOneMinuteReading() { - - //获取缓存中的电表信息 int timeDensity = 5; - var currentTime = DateTime.Now; - - // 自动计算最佳并发度 - int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); - - var options = new ParallelOptions - { - MaxDegreeOfParallelism = recommendedThreads, - }; - var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - - //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => - //{ - // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - - // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); - //}); - - await Task.CompletedTask; - - } - - /// - /// 5分钟采集电表数据 - /// - /// - public virtual async Task AmmeterScheduledMeterFiveMinuteReading() - { - //获取缓存中的电表信息 - int timeDensity = 5; - var currentTime = DateTime.Now; - - // 自动计算最佳并发度 - int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); - - var options = new ParallelOptions - { - MaxDegreeOfParallelism = recommendedThreads, - }; - var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - - //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => - //{ - // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - - // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); - //}); - } - - /// - /// 15分钟采集电表数据 - /// - /// - public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() - { - //获取缓存中的电表信息 - int timeDensity = 15; - //var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity); - - var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity); + var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity); var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) @@ -474,14 +408,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); return; } - - // 自动计算最佳并发度 - int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); - var options = new ParallelOptions - { - MaxDegreeOfParallelism = recommendedThreads, - }; var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); var conditions = new List(); @@ -499,65 +426,81 @@ namespace JiShe.CollectBus.ScheduledMeterReading PageIndex = 1, PageSize = 3000, Conditions = conditions, - }); + }); + } + /// + /// 5分钟采集电表数据 + /// + /// + public virtual async Task AmmeterScheduledMeterFiveMinuteReading() + { + int timeDensity = 5; + var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity); + var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); - ///// - ///// 创建电表待发送的任务数据 - ///// - ///// 采集频率 - ///// 时间格式的任务批次名称 - ///// - //private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) - //{ - // var timer = Stopwatch.StartNew(); + if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); + return; + } - // //获取对应频率中的所有电表信息 - // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - // var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); - // List meterInfos = new List(); - // decimal? cursor = null; - // string member = null; - // bool hasNext; - // do - // { - // var page = await _redisDataCacheService.GetAllPagedData( - // redisCacheMeterInfoHashKeyTemp, - // redisCacheMeterInfoZSetScoresIndexKeyTemp, - // pageSize: 1000, - // lastScore: cursor, - // lastMember: member); + var conditions = new List(); + conditions.Add(new QueryCondition() + { + Field = "PendingCopyReadTime", + Operator = "=", + IsNumber = true, + Value = pendingCopyReadTime + }); - // meterInfos.AddRange(page.Items); - // cursor = page.HasNext ? page.NextScore : null; - // member = page.HasNext ? page.NextMember : null; - // hasNext = page.HasNext; - // } while (hasNext); + _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + { + TableNameOrTreePath = DevicePathBuilder.GetTableName(), + PageIndex = 1, + PageSize = 3000, + Conditions = conditions, + }); + } - // if (meterInfos == null || meterInfos.Count <= 0) - // { - // timer.Stop(); - // _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); - // return; - // } + /// + /// 15分钟采集电表数据 + /// + /// + public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() + { + int timeDensity = 15; + var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity); + var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); + if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); + return; + } - // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - // items: meterInfos, - // deviceIdSelector: data => data.FocusAddress, - // processor: (data, groupIndex) => - // { - // AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); - // } - // ); + var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); - // timer.Stop(); - // _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); - //} + var conditions = new List(); + conditions.Add(new QueryCondition() + { + Field = "PendingCopyReadTime", + Operator = "=", + IsNumber = true, + Value = pendingCopyReadTime + }); + _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + { + TableNameOrTreePath = DevicePathBuilder.GetTableName(), + PageIndex = 1, + PageSize = 3000, + Conditions = conditions, + }); + } /// /// 创建电表待发送的任务数据 @@ -567,8 +510,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - private List AmmerterCreatePublishTaskAction(int timeDensity - , AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + private List AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; @@ -577,20 +519,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { - // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); return null; } //载波的不处理 if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) { - //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); return null; } if (ammeterInfo.State.Equals(2)) { - //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); + _logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); return null; } @@ -603,22 +545,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) { - // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空"); return null; } if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) { - //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空"); return null; } if (Convert.ToInt32(ammeterInfo.Address) > 65535) { - //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535"); return null; } if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) { - //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})"); return null; } @@ -781,10 +723,35 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空"); } + List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。 + //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + if (_kafkaOptions.FirstCollectionTime.HasValue == false) + { + _kafkaOptions.FirstCollectionTime = DateTime.Now; + } + + //先处理采集频率任务缓存 + foreach (var item in meterInfoGroupByTimeDensity) + { + TasksToBeIssueModel nextTask = new TasksToBeIssueModel() + { + LastTaskTime = null, + TimeDensity = item.Key, + NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + }; + + //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 + + var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key); + await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); + } + foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { + List watermeterInfo = new List(); + //将表计信息根据集中器分组,获得集中器号 var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); foreach (var item in meterInfoGroup) @@ -794,25 +761,33 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; - Dictionary keyValuePairs = new Dictionary(); + focusAddressDataList.Add(item.Key); + + var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + foreach (var subItem in item) { - - keyValuePairs.TryAdd($"{subItem.MeterId}", subItem); + watermeterInfo.Add(subItem); } - await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + + await _redisDataCacheService.BatchInsertDataAsync( + redisCacheMeterInfoHashKey, + redisCacheMeterInfoSetIndexKey, + redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo); } + } - //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = itemTimeDensity.Key, - NextTaskTime = DateTime.Now.AddMinutes(1) - }; + //初始化设备组负载控制 + if (focusAddressDataList == null || focusAddressDataList.Count <= 0) + { + _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); - var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); - await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); + } + else + { + DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); } @@ -826,107 +801,186 @@ namespace JiShe.CollectBus.ScheduledMeterReading public virtual async Task WatermeterScheduledMeterAutoReading() { //获取缓存中的水表信息 - int timeDensity = 1; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + int timeDensity = 60;//水表目前只有一个采集频率 60分钟 + var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity); + var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); + + if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) { - _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); return; } - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); + + var conditions = new List(); + conditions.Add(new QueryCondition() { - _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } + Field = "PendingCopyReadTime", + Operator = "=", + IsNumber = true, + Value = pendingCopyReadTime + }); - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) + _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - - //await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); - + TableNameOrTreePath = DevicePathBuilder.GetTableName(), + PageIndex = 1, + PageSize = 3000, + Conditions = conditions, + }); _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } - ///// - ///// 创建水表待发送的任务数据 - ///// - ///// 采集频率 - ///// 水表信息 - ///// 集中器所在分组 - ///// 时间格式的任务批次名称 - ///// - //private void WatermeterCreatePublishTaskAction(int timeDensity - // , WatermeterInfo meterInfo, int groupIndex, string taskBatch) - //{ - // var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + /// + /// 创建水表待发送的任务数据 + /// + /// 采集频率 + /// 水表信息 + /// 集中器所在分组 + /// 时间格式的任务批次名称 + /// + private List WatermeterCreatePublishTaskAction(int timeDensity + , WatermeterInfo watermeter, int groupIndex, DateTime timestamps) + { + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + var currentTime = DateTime.Now; + + string typeName; + if (watermeter.MeterType == MeterTypeEnum.WaterMeter) + { + timeDensity = watermeter.TimeDensity;//水表默认为60分钟 + typeName = watermeter.LinkType; + if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔")) + { + typeName = watermeter.MeterBrand; + } + } + else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter) + { + typeName = watermeter.MeterBrand; + } + else + { + _logger.LogError($"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}"); + return null; + } + + List taskList = new List(); + + + List tempCodes = new List() { "10_1" }; + + //todo 后续从协议池获取 + if (watermeter.MeterTypeName.Equals("水表") && (watermeter.Protocol.Equals((int)MeterLinkProtocol.CJT_188_2018) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_1997) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_2007)))//水表且(CJT_188_2018或DLT_645_1997)都采用0C_129 + { + if (watermeter.MeterBrand.Contains("炬华有线")) + { + tempCodes = new List() { "0C_188" }; + } + else + { + tempCodes = new List() { "0C_129" }; + } + } + + else if (typeName.Trim().Equals("西恩超声波流量计")) + { + tempCodes = new List() { "10_1" }; + } + else if (typeName.Trim().Equals("江苏华海涡街流量计积算仪")) + { + tempCodes = new List() { "10_1" }; + } + else if (typeName.Trim().Equals("V880BR涡街流量计")) + { + tempCodes = new List() { "10_1" }; + } + else if (typeName.Trim().Equals("拓思特涡街流量计H880BR")) + { + tempCodes = new List() { "10_1" }; + } - // var currentTime = DateTime.Now; - // var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + foreach (var tempItem in tempCodes) + { + //排除已发送日冻结和月冻结采集项配置 + if (DayFreezeCodes.Contains(tempItem)) + { + continue; + } - // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - // var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + if (MonthFreezeCodes.Contains(tempItem)) + { + continue; + } + + var itemCodeArr = tempItem.Split('_'); + var aFNStr = itemCodeArr[0]; + var aFN = (AFN)aFNStr.HexToDec(); + var fn = int.Parse(itemCodeArr[1]); + TelemetryPacketResponse builderResponse = null; + + string methonCode = $"AFN{aFNStr}_Fn_Send"; + //特殊表暂不处理 + if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode + , out var handler)) + { + builderResponse = handler(new TelemetryPacketRequest() + { + FocusAddress = watermeter.FocusAddress, + Fn = fn, + Pn = watermeter.MeteringCode, + DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address), + }); + } + else + { + _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}无效编码。"); + continue; + } + + if (builderResponse == null || builderResponse.Data.Length <= 0) + { + _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}未能正确获取报文。"); + continue; + } - // var taskInfo = new MeterReadingTelemetryPacketInfo() - // { - // Seq= null, + string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, watermeter.MeteringCode, builderResponse.MSA); + var meterReadingRecords = new MeterReadingTelemetryPacketInfo() + { + SystemName = SystemType, + ProjectId = $"{watermeter.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{watermeter.FocusAddress}", + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), + DatabaseBusiID = watermeter.DatabaseBusiID, + PendingCopyReadTime = timestamps, + CreationTime = currentTime, + MeterAddress = watermeter.MeterAddress, + AFN = (int)aFN, + Fn = fn, + //Seq = builderResponse.Seq, + MSA = builderResponse.MSA, + ItemCode = tempItem, + TaskMark = taskMark, + IsSend = false, + ManualOrNot = false, + Pn = watermeter.MeteringCode, + IssuedMessageId = GuidGenerator.Create().ToString(), + IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), + IsReceived = false, + ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(), + }; - // }; - // // + taskList.Add(meterReadingRecords); + } - // Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); + return taskList; - // using (var pipe = FreeRedisProvider.Instance.StartPipe()) - // { - // // 主数据存储Hash - // pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); - - // // Set索引缓存 - // pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); - - // // ZSET索引缓存Key - // pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); - - // pipe.EndPipe(); - // } - - //} + } #endregion @@ -948,19 +1002,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading return false; } - - /// - /// 获取缓存表计下发指令缓存key前缀 - /// - /// - /// - /// - private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType) - { - return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*"; - } - - + /// /// 创建表的待发送的任务数据 /// @@ -982,34 +1024,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading decimal? cursor = null; string member = null; bool hasNext; - //do - //{ - // var page = await _redisDataCacheService.GetAllPagedData( - // redisCacheMeterInfoHashKeyTemp, - // redisCacheMeterInfoZSetScoresIndexKeyTemp, - // pageSize: 1000, - // lastScore: cursor, - // lastMember: member); + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); - // meterInfos.AddRange(page.Items); - // cursor = page.HasNext ? page.NextScore : null; - // member = page.HasNext ? page.NextMember : null; - // hasNext = page.HasNext; - //} while (hasNext); + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 10, - lastScore: cursor, - lastMember: member); - meterInfos.AddRange(page.Items); + //var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 10, + // lastScore: cursor, + // lastMember: member); + //meterInfos.AddRange(page.Items); if (meterInfos == null || meterInfos.Count <= 0) { timer.Stop(); - _logger.LogError($"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } @@ -1037,7 +1079,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading bool hasNext; var stopwatch = Stopwatch.StartNew(); - var ddd = _runtimeContext.UseTableSessionPool; + var ddd = _runtimeContext.UseTableSessionPool; do { @@ -1061,55 +1103,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading stopwatch.Stop(); _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } - - - ///// - ///// 创建Kafka消息 - ///// - ///// - ///// - ///// - //private async Task CreateMeterKafkaTaskMessage( - //string redisCacheTelemetryPacketInfoHashKey, - //string redisCacheTelemetryPacketInfoZSetScoresIndexKey) - //{ - // if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)) - // { - // throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101"); - // } - - // decimal? cursor = null; - // string member = null; - // bool hasNext; - // var stopwatch = Stopwatch.StartNew(); - // do - // { - // var page = await _redisDataCacheService.GetAllPagedData( - // redisCacheTelemetryPacketInfoHashKey, - // redisCacheTelemetryPacketInfoZSetScoresIndexKey, - // pageSize: 1000, - // lastScore: cursor, - // lastMember: member); - - // cursor = page.HasNext ? page.NextScore : null; - // member = page.HasNext ? page.NextMember : null; - // hasNext = page.HasNext; - - // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - // items: page.Items, - // deviceIdSelector: data => data.FocusAddress, - // processor: (data, groupIndex) => - // { - // _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); - // } - // ); - - // } while (hasNext); - - // stopwatch.Stop(); - // _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); - //} - + /// /// Kafka 推送消息 /// diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index fe0746f..90266ce 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -101,6 +101,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading // TypeName = 3, // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", // TimeDensity = 15, + // BrandType = "", //}); //ammeterInfos.Add(new AmmeterInfo() //{ @@ -115,6 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading // TypeName = 1, // DataTypes = "581,589,592,597,601", // TimeDensity = 15, + // BrandType = "", //}); //return ammeterInfos; @@ -127,10 +129,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading WHERE 1=1 and C.Special = 0 "; //TODO 记得移除特殊表过滤 - //if (!string.IsNullOrWhiteSpace(gatherCode)) - //{ - // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; - //} + if (!string.IsNullOrWhiteSpace(gatherCode)) + { + sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; + } return await SqlProvider.Instance.Change(DbEnum.EnergyDB) .Ado .QueryAsync(sql); @@ -186,30 +188,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading return await SqlProvider.Instance.Change(DbEnum.EnergyDB) .Ado .QueryAsync(sql); - } - - - /// - /// 测试设备分组均衡控制算法 - /// - /// - /// - [HttpGet] - public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000) - { - var deviceList = new List(); - for (int i = 0; i < deviceCount; i++) - { - deviceList.Add($"Device_{Guid.NewGuid()}"); - } - - // 初始化缓存 - DeviceGroupBalanceControl.InitializeCache(deviceList); - - // 打印分布统计 - DeviceGroupBalanceControl.PrintDistributionStats(); - - await Task.CompletedTask; - } + } } } \ No newline at end of file diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs index be97769..eac70a1 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs @@ -18,7 +18,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 /// [Column(IsIgnore = true)] - public override string MemberId => $"{FocusId}:{MeterId}"; + public override string MemberId => $"{FocusAddress}:{MeteringCode}"; /// /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 @@ -90,6 +90,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter //// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5,燃气表流量计=6,特殊电表=7 /// public MeterTypeEnum MeterType { get; set; } + /// /// 设备品牌; /// (当 MeterType = 水表, 如 威铭、捷先 等) @@ -138,12 +139,17 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// 采集器编号 /// public string GatherCode { get; set; } - + /// /// 项目ID /// public int ProjectID { get; set; } + /// + /// 数据库业务ID + /// + public int DatabaseBusiID { get; set; } + /// /// 是否异常集中器 0:正常,1异常 /// diff --git a/shared/JiShe.CollectBus.Common/Enums/BrandTypeEnum.cs b/shared/JiShe.CollectBus.Common/Enums/BrandTypeEnum.cs new file mode 100644 index 0000000..83429ae --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Enums/BrandTypeEnum.cs @@ -0,0 +1,593 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Enums +{ + /// + /// 水表\流量计\特殊电表品牌 + /// + public enum BrandTypeEnum + { + /// + /// 默认(OC_129) + /// + //None = 0, + /// + /// 冻结数据(0D_101) + /// + //Freeze = 1, + /// + /// 默认(OC_129)or 冻结数据(0D_101) + /// + NoneOrFreeze = 0, + /// + /// 188-良禾 + /// + LiangHe188 = 1, + /// + /// 188-威铭 + /// + WeiMing188 = 2, + /// + /// 188-宁波 + /// + NingBo188 = 3, + /// + /// 西恩电磁流量计 + /// + XEDC = 4, + /// + /// 西恩超声波流量计 + /// + XECSB = 5, + /// + /// 电磁流量计(LDG-DN200) + /// + DCLDGDN200 = 6, + /// + /// 燃气表抄读 + /// + Gasmeter = 7, + /// + /// 涡街流量计 + /// + WJFlowmeter = 8, + /// + /// 流量计 + /// + Flowmeter = 9, + /// + /// 涡街流量计(LUGBDN100) + /// + WJFlowmeterLUGBDN100 = 10, + /// + /// 涡街流量计(ZC-LUGB-232ZDNNY) + /// + WJFlowmeterZCLUGB232ZDNNY = 11, + /// + /// SB2100蒸汽表 + /// + ZQBSB2100Flowmeter = 12, + /// + /// (HD)热式流量计 + /// + RSHDFlowmeter = 13, + /// + /// (HDWMNDN300)热式流量计 + /// + RSHDWMNDN300 = 14, + /// + /// 热式流量计(FLRS110-C100) + /// + RSFLRS110C100 = 15, + /// + /// 通用188协议 + /// + Universal188 = 16, + #region 特殊电表 + DTZ719 = 17, + AKKJMC800PY = 18, + HRKJ001 = 19, + THYB9D1 = 20, + DTSD342 = 21, + /// + /// 谐波 + /// + AFN16_F109 = 22, + /// + /// DL/T 645—2007 规约时采用该类 + /// + CustomItemCode_93 = 23, + /// + /// 电表组合有功示值透抄CustomItemCode_95 + /// + AFN16_F95 = 24, + #endregion + /// + /// 非特殊表 + /// + None = 25, + /// + /// SDM630MCT 导轨电表 + /// + SDM630MCT = 26, + /// + /// 水表通过0C_129采集 + /// + Watermeter0C_129 = 27, + /// + /// YYD电磁流量计 + /// + YYDFlowmeter = 28, + /// + /// 透明转发 跳合闸(水表) + /// + AFN16_F99 = 29, + /// + /// 透明转发 跳合闸(气表) + /// + AFN16_F100 = 30, + /// + /// 温度压力补偿涡街流量计(TPC1001)涡街流量计 + /// + WJTPC1001 = 31, + /// + /// (LDG-SP25)树普电磁流量计 + /// + ShupuLDGSP25Flowmeter = 32, + /// + /// 西恩涡街流量计(LUGBC-100) + /// + XEWJLUGBC100 = 33, + /// + /// 智能涡街流量计(UG-1132A) + /// + WJUG1132A = 34, + /// + /// 水表通过0D_101采集 + /// + Watermeter0D_101 = 35, + /// + /// 读取SIM卡号 + /// + AFN16_F101 = 36, + /// + /// 恒瑞科技三相导轨式电能表 or 恒瑞科技嵌入式电表测试 + /// + DTS600 = 37, + /// + /// 恒瑞科技单相导轨式电能表(DDS600) + /// + DDS600 = 38, + /// + /// 旋进漩涡流量计(LUXB) 天津凯隆仪表科技有限公司 + /// + XJXWLUXB = 39, + /// + /// DDSD720-L科陆单相导轨表 + /// + DDSD720L = 40, + /// + /// 东久电磁流量计DJLD + /// + DJLD = 41, + /// + /// DTSD720-L科陆三相导轨表 + /// + DTSD720L = 42, + /// + /// 世成(社为表计)涡街流量计 + /// + SCLUGB = 43, + /// + /// CL7339MN-ZY科陆三相表 + /// + CL7339MNZY = 44, + /// + /// 江森智能SNY723MC数显表 + /// + SNY723MC = 45, + /// + /// 珠海派诺科技PMAC770三相数显表 + /// + PMAC770 = 46, + /// + /// 北京中科涡街流量计(ZKTD-LUCBY) + /// + ZKTD_LUGBY = 47, + /// + /// 夏仪股份蒸汽流量计(LUGB-DN) + /// + LUGB_DN = 48, + /// + /// LWQ-D2型气体涡轮流量计 + /// + LWQ_D2 = 49, + /// + /// 西恩涡街流量计分体式(流量积算仪32FC系列) + /// + XEJSY32FC = 50, + /// + /// 寺崎科技PD652E-9S4电表 + /// + PD652E9S4 = 51, + /// + /// 液体涡轮流量计(LWGY) + /// + LWGY = 52, + /// + /// 多功能积算仪(RW-A) + /// + DGNRWA = 53, + /// + /// 杭梅电气DTS804导轨表 + /// + DTS804 = 54, + /// + /// 杭梅电气HG194-D93数显表 + /// + HG194D93 = 55, + /// + /// 连水超声波水表188 + /// + Lianshui188 = 56, + /// + /// 湖北回盛生物科技有限公司EZT96Y数显表 + /// + EZT96Y, + /// + /// 上海肖月智能流量积算仪 + /// + ZNLLJ, + /// + /// 西安诚通电磁流量计 + /// + CTLDE250SC31GM8FB, + /// + /// 雅达YD2040 + /// + YD2040, + /// + /// EVC智能体积修正仪 + /// + EVC, + /// + /// 气体超声流量计IGSM-TS + /// + IGSMTS, + /// + /// YX-9SYE三相多功能表 + /// + YX9SYE, + /// + /// 世成液体涡轮流量计(SCLWGY-DN50) + /// + SCLWGYDN50, + /// + /// 杭州盘古积算仪(FX6000F) + /// + FX6000F, + /// + /// "盘古电磁流量计(PMF-GM4.0A1-50M11K1F1T0C3) + /// + PFMGM40A150M11K1F1T0C3, + /// + /// 西恩液体涡轮流量计(SEAN LWGY-50) + /// + SeanLWGY50, + /// + /// 雷泰电磁流量计LD-LDE-DN50 + /// + LDLDEDN50, + /// + /// 雷泰涡街流量计(LT-LUGB-DN50) + /// + LTLUGBDN50, + /// + /// 珠海派诺科技股份有限公司SPM33电力仪表 + /// + SPM33, + /// + /// 株洲斯瑞克电气有限公司三相数显多功能电力仪表PD369E-AS4 + /// + PD369EAS4, + /// + /// 湖北回盛生物科技有限公司-涡街流量计(10VTEAD03A200C1A2HOAG) + /// + WJ10VTEAD03A200C1A2HOAG, + /// + /// 世成旋进旋涡流量计SCLUX-DN25 + /// + SCLUXDN25, + /// + /// 世成气体涡轮流量计(SCLWGQ-DN50) + /// + SCLWGQDN50, + /// + /// 智能电磁流量计(MDL210) + /// + MDL210, + /// + /// 江苏华海涡街流量计Focvor4202 + /// + Focvor4202, + /// + /// 华凯电力HK194E-9S4 + /// + HK194E9S4, + /// + /// 威胜测试-DTSD342_9N + /// + DTSD342Test, + /// + ///科迈捷涡街流量计VFM-60 + /// + VFM60, + /// + ///江苏华海涡街流量计积算仪 + /// + HHJSY, + /// + ///宏江4G水表 + /// + HJDN15, + /// + ///世成4G涡街流量计 + /// + LPV2, + /// + ///浙江正泰DTSU666 + /// + DTSU666, + /// + /// 浙江启唯电气-数码三相多功能电表QV194E-9S4 + /// + QV194E9S4, + /// + /// 施耐德PM2100 + /// + PM2100, + + /// + /// 天康电磁流量计 + /// + TK1100FT, + + /// + /// 西恩气体涡轮流量计(SEANLWQ) + /// + SEANLWQ, + + /// + /// V880BR涡街流量计 + /// + V880BR, + + /// + /// 大导SDD194E-9 + /// + SDD194E_9, + + /// + ///泉高阀门科技有限公司-超声波水表 + /// + QGFMCSB, + + #region 传感器型号 + SensorMeter, + #endregion + + /// + /// 分体式超声波明渠流量计SC-6000F + /// + SC6000F, + + /// + /// 江苏京仪JLWQ型气体流量计JLWQ型气体流量计 + /// + JLWQ, + + /// + /// 广州智光SMC200 + /// + SMC200, + + /// + /// 左拓ZFM2-621 + /// + ZFM2621, + + /// + /// 江苏华尔威涡街旋进流量计 + /// + HRW520, + + /// + /// 施耐德PM5350P + /// + PM5350P, + + /// + /// 施耐德PM810MG + /// + PM810MG, + + /// + /// 浙江之高ZL96-3E + /// + ZL96_3E, + /// + /// 拓普电子PD284Z-9S4 + /// + PD284Z_9S4, + /// + /// 上海普川DTSU5886 + /// + DTSU5886, + /// + /// 安德利SC194E-9S4 + /// + SC194E9S4, + /// + /// 浙江天电电气TD700E-AS3 + /// + TD700EAS3, + /// + /// 世成分体式涡街流量计SW-SCLUGB-DN + /// + SWSCLUGBDN, + /// + /// 东久电磁冷热量计SW-DJLD + /// + SWDJLD, + + /// + /// 北京中科拓达ZKTD-LUGB + /// + ZKTD_LUGB, + + /// + /// 江苏英美迪自动化有限公司三相液晶多功能仪表YMD96A-E4 + /// + YMD96A_E4, + + /// + /// 金湖盛元LWQ气体涡轮流量计 + /// + JHSYLWQ, + + /// + /// 天康涡街流量计TK2000 + /// + TK2000, + + /// + /// 浙江迈拓三相导轨电表DTSF1709 + /// + DTSF1709, + + /// + /// 杭州逸控科技超声波流量计ECUL30B-L2C1NSVC + /// + ECUL30BL2C1NSVC, + /// + /// 数字电测表HTD288-DM44/R + /// + HTD288, + /// + /// 杭州逸控科技有限公司ECLUGB2305W3C2N + /// + ECLUGB2305W3C2N, + + /// + /// 江苏华海测控科技有限公司温压补偿流量积算仪 + /// + XMJA9000, + + /// + /// 湖南佳一机电设备有限公司精致型蒸汽热能积算仪 + /// + F3200H, + + /// + /// 合兴加能电梯能量回馈装置 + /// + IPCPFE04MNDC, + + /// + /// 宁波创盛旋涡流量计 + /// + CX25, + /// + /// 群力电气QDCZY1N + /// + QDCZY1N, + + /// + ///深圳中电PMCS963C + /// + PMCS963C, + /// + /// 迅尔燃气表D2SD2ET3D4S + /// + D2SD2ET3D4S, + /// + /// INTELLIGENT积算仪F2000X + /// + F2000X, + + /// + ///多盟-DM194Z-9SY + /// + DM194Z9SY, + /// + /// 纳宇PD760 + /// + PD760, + /// + /// 纳宇DTS90031LPD + /// + DTS90031LPD, + + /// + /// 上海施易克SEIK680数显表 + /// + SEIK680, + + /// + /// 中灵电气ZL125SC + /// + ZL125SC, + /// + /// 江苏京仪气体涡轮流量计JLWQE + /// + JLWQE, + /// + /// HART智能转换器 + /// + SM100, + + /// + /// 拓思特涡街流量计H880BR + /// + H880BR, + /// + /// DDSD720-L-单相电子式导轨表 + /// + DDSD720L2, + + /// + /// 浙江智电三相三线大功率有功电能表 + /// + ZJZDSXSX, + + /// + /// 山水翔水表LXSY + /// + LXSY, + + /// + /// 衡水多元仪表有限公司气体涡轮流量计DYWQ + /// + DYWQ, + + /// + /// 安徽聚积电子 + /// + DDS2052, + + /// + /// 湖南中麦 + /// + ZMDTSD3429N, + + /// + ///DTS2377三相导轨式多功能智能电表 + /// + DTS2377 + + } +} diff --git a/shared/JiShe.CollectBus.Common/Enums/MeterLinkProtocol.cs b/shared/JiShe.CollectBus.Common/Enums/MeterLinkProtocol.cs new file mode 100644 index 0000000..6c06c5c --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Enums/MeterLinkProtocol.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Enums +{ + /// + /// 表计连接通讯协议--表计与集中器的通讯协议 + /// + public enum MeterLinkProtocol + { + /// + /// 无 + /// + None = 0, + + /// + /// DL/T 645—1997 + /// + DLT_645_1997 = 1, + + /// + /// 交流采样装置通信协议(电表) + /// + ACSamplingDevice = 2, + + /// + /// DL/T 645—2007 + /// + DLT_645_2007 = 30, + + /// + /// 载波通信 + /// + Carrierwave = 31, + + /// + /// CJ/T 188—2018协议(水表) + /// + CJT_188_2018 = 32, + + /// + /// CJ/T 188—2004协议 + /// + CJT_188_2004 = 33, + + /// + /// MODBUS-RTU + /// + MODBUS_RTU = 34, + } +} From 17bb8a569213df14a22769f2d5f6903123eaa9d4 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 23 Apr 2025 09:45:31 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E5=90=88=E5=B9=B6=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web/JiShe.CollectBus.Host/Pages/Monitor.cshtml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index b25f9f0..5c4cd21 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +
[TAGColumn] - public string SystemName { get; set; } + public string SystemName { get; set; } /// /// 项目编码 /// - [ATTRIBUTEColumn] - public string ProjectId { get; set; } + [TAGColumn] + public string ProjectId { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 /// - [ATTRIBUTEColumn] - public string DeviceType { get; set; } + [TAGColumn] + public string DeviceType { get; set; } /// - /// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址 + /// 设备ID,数据生成者,例如集中器ID,电表ID、水表ID、流量计ID、传感器ID等 /// [TAGColumn] - public string DeviceId { get; set; } + public string DeviceId { get; set; } /// /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// - public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index 6922c62..6a1a596 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } @@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDeviceTableName(T entity) where T : IoTEntity { - return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; + return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 095f01b..49770ef 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,5 +1,6 @@ using Amazon.Runtime.Internal.Endpoints.StandardLibrary; using Confluent.Kafka; +using DeviceDetectorNET.Parser.Device; using DnsClient.Protocol; using FreeSql; using JiShe.CollectBus.Ammeters; @@ -172,7 +173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTaskTime, - meterType: MeterTypeEnum.Ammeter, + meterType: MeterTypeEnum.WaterMeter, taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); @@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var timer = Stopwatch.StartNew(); - List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。 + List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。 //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); @@ -320,11 +321,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading { continue; } - - focusAddressDataList.Add(item.Key); - + foreach (var ammeter in item) { + deviceIds.Add(ammeter.MeterId.ToString()); + //处理ItemCode if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes)) { @@ -378,14 +379,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //初始化设备组负载控制 - if (focusAddressDataList == null || focusAddressDataList.Count <= 0) + if (deviceIds == null || deviceIds.Count <= 0) { _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); } else { - DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); + DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions); } timer.Stop(); @@ -661,7 +662,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading SystemName = SystemType, ProjectId = $"{ammeterInfo.ProjectID}", DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{ammeterInfo.FocusAddress}", + DeviceId = $"{ammeterInfo.MeterId}", Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), DatabaseBusiID = ammeterInfo.DatabaseBusiID, PendingCopyReadTime = timestamps, @@ -723,7 +724,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空"); } - List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。 + List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。 //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); @@ -761,7 +762,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - focusAddressDataList.Add(item.Key); var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; @@ -769,6 +769,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading foreach (var subItem in item) { + deviceIds.Add(subItem.MeterId.ToString()); + watermeterInfo.Add(subItem); } @@ -780,14 +782,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //初始化设备组负载控制 - if (focusAddressDataList == null || focusAddressDataList.Count <= 0) + if (deviceIds == null || deviceIds.Count <= 0) { _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); } else { - DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); + DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions); } @@ -954,7 +956,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading SystemName = SystemType, ProjectId = $"{watermeter.ProjectID}", DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{watermeter.FocusAddress}", + DeviceId = $"{watermeter.MeterId}", Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), DatabaseBusiID = watermeter.DatabaseBusiID, PendingCopyReadTime = timestamps, From a340225cda3c7ec56edc1afd1372a86749f34e72 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Wed, 23 Apr 2025 13:59:15 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E8=80=85=E8=A2=AB=E8=87=AA=E5=8A=A8=E8=B8=A2=E5=87=BA=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E7=BB=84=E5=A2=9E=E5=8A=A0=E8=87=AA=E5=8A=A8=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JiShe.CollectBus.Kafka.Test/Program.cs | 1 + .../CollectBusKafkaModule.cs | 3 + .../Consumer/ConsumerService.cs | 516 ++++++++++-------- .../Internal/KafkaPollyPipeline.cs | 111 ++++ .../JiShe.CollectBus.Kafka.csproj | 2 + .../JiSheCollectBusProtocolModule.cs | 4 +- .../Pages/Monitor.cshtml | 1 + 7 files changed, 399 insertions(+), 239 deletions(-) create mode 100644 modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs diff --git a/modules/JiShe.CollectBus.Kafka.Test/Program.cs b/modules/JiShe.CollectBus.Kafka.Test/Program.cs index 016d0c4..3c99810 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/Program.cs +++ b/modules/JiShe.CollectBus.Kafka.Test/Program.cs @@ -63,6 +63,7 @@ var host = Host.CreateDefaultBuilder(args) services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddTransient(); }) diff --git a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs index 5621b3d..d31b9ed 100644 --- a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs +++ b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs @@ -41,6 +41,9 @@ namespace JiShe.CollectBus.Kafka // 注册Consumer context.Services.AddSingleton(); + // 注册Polly + context.Services.AddSingleton(); + //context.Services.AddHostedService(); } diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 32df748..d4a62db 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -2,12 +2,14 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; +using Microsoft.AspNetCore.DataProtection.KeyManagement; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Text; using System.Text.RegularExpressions; +using System.Threading; namespace JiShe.CollectBus.Kafka.Consumer { @@ -20,17 +22,26 @@ namespace JiShe.CollectBus.Kafka.Consumer ///
private readonly ConcurrentDictionary _consumerStore = new(); + + /// + /// 消费完或者无数据时的延迟时间 + /// + private TimeSpan DelayTime => TimeSpan.FromMilliseconds(100); + private readonly KafkaOptionConfig _kafkaOptionConfig; + private readonly KafkaPollyPipeline _kafkaPollyPipeline; + /// /// ConsumerService /// /// /// - public ConsumerService(ILogger logger, IOptions kafkaOptionConfig) + public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline) { _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; + _kafkaPollyPipeline = kafkaPollyPipeline; } #region private 私有方法 @@ -99,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class { - await SubscribeAsync(new[] { topic }, messageHandler,groupId); + await SubscribeAsync(new[] { topic }, messageHandler, groupId); } /// @@ -112,58 +123,75 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - consumer!.Subscribe(topics); - - await Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - while (!cts.IsCancellationRequested) - { - try - { - //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); - var result = consumer.Consume(cts.Token); - if (result == null || result.Message==null || result.Message.Value == null) - continue; - - if (result.IsPartitionEOF) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + _ = Task.Run(async () => + { + while (!cts.IsCancellationRequested) + { + try { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); - continue; - } - if (_kafkaOptionConfig.EnableFilter) - { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); + + var result = consumer.Consume(cts.Token); + if (result == null || result.Message == null || result.Message.Value == null) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 + await Task.Delay(DelayTime, cts.Token); continue; } + if (result.IsPartitionEOF) + { +#if DEBUG + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); +#endif + await Task.Delay(DelayTime, cts.Token); + continue; + } + if (_kafkaOptionConfig.EnableFilter) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + bool sucess = await messageHandler(result.Message.Key, result.Message.Value); + if (sucess) + consumer.Commit(result); // 手动提交 } - bool sucess= await messageHandler(result.Message.Key, result.Message.Value); - if (sucess) + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - consumer.Commit(result); // 手动提交 + _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理消息时发生未知错误"); } } - catch (ConsumeException ex) - { - _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); - } - } + }, cts.Token); + await Task.CompletedTask; }); - await Task.CompletedTask; + } @@ -178,7 +206,8 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { - try { + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => + { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => @@ -201,14 +230,16 @@ namespace JiShe.CollectBus.Kafka.Consumer var result = consumer.Consume(cts.Token); if (result == null || result.Message == null || result.Message.Value == null) { - await Task.Delay(500, cts.Token); + await Task.Delay(DelayTime, cts.Token); continue; } if (result.IsPartitionEOF) { +#if DEBUG _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(100, cts.Token); +#endif + await Task.Delay(DelayTime, cts.Token); continue; } if (_kafkaOptionConfig.EnableFilter) @@ -217,7 +248,6 @@ namespace JiShe.CollectBus.Kafka.Consumer // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { - await Task.Delay(500, cts.Token); //consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; @@ -226,22 +256,26 @@ namespace JiShe.CollectBus.Kafka.Consumer bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 - else - consumer.StoreOffset(result); + //else + // consumer.StoreOffset(result); } - catch (ConsumeException ex) + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理消息时发生未知错误"); } } - }); - } catch (Exception ex) - { - _logger.LogWarning($"Kafka消费异常: {ex.Message}"); - - } - - await Task.CompletedTask; + }, cts.Token); + await Task.CompletedTask; + }); } @@ -270,109 +304,114 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费组ID /// 批次大小 /// 批次超时时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - consumer!.Subscribe(topics); - - var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); - var startTime = DateTime.UtcNow; - while (!cts.IsCancellationRequested) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => { - try - { - // 非阻塞快速累积消息 - while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) - { - var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + var startTime = DateTime.UtcNow; - if (result != null) + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { - if (result.IsPartitionEOF) + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) { - //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(10, cts.Token); - } - else if (result.Message.Value != null) - { - if (_kafkaOptionConfig.EnableFilter) + if (result.IsPartitionEOF) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) +#if DEBUG + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); +#endif + await Task.Delay(DelayTime, cts.Token); + } + else if (result.Message.Value != null) + { + if (_kafkaOptionConfig.EnableFilter) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + } + } + else + { + // 无消息时短暂等待 + await Task.Delay(DelayTime, cts.Token); + } + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); + if (success) + { + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) + { + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) + { + offsetsByPartition[tp] = offset; } } - messages.Add((result.Message.Value, result.TopicPartitionOffset)); - //messages.Add(result.Message.Value); - } - } - else - { - // 无消息时短暂等待 - await Task.Delay(10, cts.Token); - } - } - // 处理批次 - if (messages.Count > 0) + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); - if (success) - { - var offsetsByPartition = new Dictionary(); - foreach (var msg in messages) - { - var tp = msg.Offset.TopicPartition; - var offset = msg.Offset.Offset; - if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) - { - offsetsByPartition[tp] = offset; - } - } - - var offsetsToCommit = offsetsByPartition - .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) - .ToList(); - consumer.Commit(offsetsToCommit); - } - messages.Clear(); + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); - startTime = DateTime.UtcNow; - } - catch (ConsumeException ex) - { - _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); - } - catch (OperationCanceledException) - { - // 任务取消,正常退出 - - } - catch (Exception ex) - { - _logger.LogError(ex, "处理批量消息时发生未知错误"); - } - } - }, cts.Token); - - await Task.CompletedTask; + await Task.CompletedTask; + }); } @@ -403,110 +442,113 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次大小 /// 批次超时时间 /// 消费等待时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class + public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - - consumer!.Subscribe(topics); - - var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); - //var messages = new List>(); - var startTime = DateTime.UtcNow; - while (!cts.IsCancellationRequested) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => { - try - { - // 非阻塞快速累积消息 - while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) - { - var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + var startTime = DateTime.UtcNow; - if (result != null) + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { - if (result.IsPartitionEOF) + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) { - //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(10, cts.Token); - } - else if (result.Message.Value != null) - { - if (_kafkaOptionConfig.EnableFilter) + if (result.IsPartitionEOF) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(DelayTime, cts.Token); + } + else if (result.Message.Value != null) + { + if (_kafkaOptionConfig.EnableFilter) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + } + } + else + { + // 无消息时短暂等待 + await Task.Delay(DelayTime, cts.Token); + } + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); + if (success) + { + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) + { + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) + { + offsetsByPartition[tp] = offset; } } - messages.Add((result.Message.Value, result.TopicPartitionOffset)); - //messages.Add(result.Message.Value); - } - } - else - { - // 无消息时短暂等待 - await Task.Delay(10, cts.Token); - } - } - // 处理批次 - if (messages.Count > 0) + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); - if (success) - { - var offsetsByPartition = new Dictionary(); - foreach (var msg in messages) - { - var tp = msg.Offset.TopicPartition; - var offset = msg.Offset.Offset; - if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) - { - offsetsByPartition[tp] = offset; - } - } - - var offsetsToCommit = offsetsByPartition - .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) - .ToList(); - consumer.Commit(offsetsToCommit); - } - messages.Clear(); + _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); - startTime = DateTime.UtcNow; - } - catch (ConsumeException ex) - { - _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); - } - catch (OperationCanceledException) - { - // 任务取消,正常退出 - } - catch (Exception ex) - { - _logger.LogError(ex, "处理批量消息时发生未知错误"); - } - } - }, cts.Token); - - await Task.CompletedTask; + await Task.CompletedTask; + }); } @@ -515,9 +557,9 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// /// - public void Unsubscribe(string[] topics, string groupId) where TKey : notnull where TValue : class + public void Unsubscribe(string[] topics, string? groupId) where TKey : notnull where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; if (_consumerStore.TryRemove(consumerKey, out var entry)) { entry.CTS.Cancel(); diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs new file mode 100644 index 0000000..c467921 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs @@ -0,0 +1,111 @@ +using Confluent.Kafka; +using Polly.CircuitBreaker; +using Polly.Retry; +using Polly; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Polly.Contrib.WaitAndRetry; +using Volo.Abp.DependencyInjection; +using Microsoft.Extensions.Logging; +using JiShe.CollectBus.Kafka.Producer; + +namespace JiShe.CollectBus.Kafka.Internal +{ + public class KafkaPollyPipeline + { + + private readonly ILogger _logger; + public KafkaPollyPipeline(ILogger logger) + { + _logger= logger; + } + + /// + /// 判断是否可恢复的异常 + /// + /// + /// + public static bool IsRecoverableError(Exception ex) + { + var errorList= new List + { + ErrorCode.GroupLoadInProgress, + ErrorCode.Local_Retry, + ErrorCode.Local_MaxPollExceeded, + ErrorCode.RequestTimedOut, + ErrorCode.LeaderNotAvailable, + ErrorCode.NotLeaderForPartition, + ErrorCode.RebalanceInProgress, + ErrorCode.NotCoordinatorForGroup, + ErrorCode.NetworkException, + ErrorCode.GroupCoordinatorNotAvailable + }; + return ex switch + { + ConsumeException kafkaEx => errorList.Contains(kafkaEx.Error.Code), + KafkaException kafkaEx =>kafkaEx.Error.IsFatal && errorList.Contains(kafkaEx.Error.Code), + _ => false + }; + + } + + /// + /// 创建重试 + 断路器 + /// + /// + public ResiliencePipeline KafkaPipeline + { + get + { + // 组合重试 + 断路器 + ResiliencePipeline pipeline = new ResiliencePipelineBuilder() + .AddRetry(new RetryStrategyOptions + { + ShouldHandle = args => args.Outcome.Exception switch + { + not null when IsRecoverableError(args.Outcome.Exception) => + PredicateResult.True(), + _ => PredicateResult.False() + }, + Delay = TimeSpan.FromSeconds(2), + OnRetry = args => + { + _logger.LogWarning($"重试中... 第 {args.AttemptNumber} 次,原因: {args.Outcome.Exception?.Message}"); + return default; + } + }) + .AddCircuitBreaker(new CircuitBreakerStrategyOptions + { + ShouldHandle = args => args.Outcome.Exception switch + { + not null when IsRecoverableError(args.Outcome.Exception) => + PredicateResult.True(), + _ => PredicateResult.False() + }, + FailureRatio = 0.8, // 80% 失败触发熔断 + SamplingDuration = TimeSpan.FromSeconds(10), + MinimumThroughput = 4, // 至少4次调用才计算失败率 + BreakDuration = TimeSpan.FromSeconds(10), + OnOpened = args => + { + _logger.LogWarning($"熔断器开启,等待 {args.BreakDuration} 后重试"); + return default; + }, + OnClosed = _ => + { + _logger.LogWarning("熔断器关闭,再次开始重试"); + return default; + } + }) + .Build(); + return pipeline; + } + + } + + + } +} diff --git a/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj b/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj index ce31120..a0fd2f7 100644 --- a/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj +++ b/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj @@ -8,6 +8,8 @@ + + diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 8019d34..842ad39 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -50,13 +50,13 @@ namespace JiShe.CollectBus.Protocol var assembly = existingAssembly ?? Assembly.LoadFrom(file); // 实现IAnalysisStrategy接口 var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>))); - if (analysisStrategyTypes.Count() == 0) + if (!analysisStrategyTypes.Any()) continue; foreach (var analysisStrategyType in analysisStrategyTypes) { // 通过反射获取静态元数据 var strategyType = analysisStrategyType.Name; - var genericArgs = analysisStrategyType.GetInterface("IAnalysisStrategy`2")!.GetGenericArguments(); + var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments(); var inputType = genericArgs[0]; var resultType = genericArgs[1]; // 注册策略实现 diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 88209c2..0193df3 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,6 +16,7 @@ 后端服务 +