From 921973e5d4da0d77decad68703c5855667a103ea Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Sun, 18 May 2025 16:04:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96IoTDB=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=A9=B1=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Provider/IoTDBProvider.cs | 39 +- .../AnalysisData/DataStorage.cs | 77 +- .../DataChannels/DataChannelManageService.cs | 25 +- .../Samples/SampleAppService.cs | 13 +- .../BasicScheduledMeterReadingService.cs | 2 + ...nergySystemScheduledMeterReadingService.cs | 2 +- .../SubscriberAnalysisAppService.cs | 820 +++++++++--------- .../Subscribers/WorkerSubscriberAppService.cs | 4 +- .../Helpers/CommonHelper.cs | 5 + web/JiShe.CollectBus.Host/appsettings.json | 2 +- 10 files changed, 488 insertions(+), 501 deletions(-) diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 7d7e366..cd57ea9 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -36,18 +36,19 @@ namespace JiShe.CollectBus.IoTDB.Provider { private static readonly ConcurrentDictionary MetadataCache = new(); private readonly ILogger _logger; - private readonly IIoTDbSessionFactory _sessionFactory; - private readonly IoTDBRuntimeContext _runtimeContext; + private readonly IIoTDbSessionFactory _sessionFactory; - private IIoTDbSessionPool CurrentSession => - _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); + /// + /// 存储模型切换标识,是否使用table模型存储, 默认为false,标识tree模型存储 + /// + public bool UseTableSessionPool { get; set; } + + private IIoTDbSessionPool CurrentSession { get; set; } - - //private IIoTDbSessionPool CurrentSession { get; set; } - - public IIoTDbProvider GetSessionPool(bool sessionpolType) + public IIoTDbProvider GetSessionPool(bool useTableSessionPool) { - //CurrentSession = _sessionFactory.GetSessionPool(sessionpolType); + CurrentSession = _sessionFactory.GetSessionPool(useTableSessionPool); + UseTableSessionPool = useTableSessionPool; return this; } @@ -60,12 +61,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public IoTDbProvider( ILogger logger, - IIoTDbSessionFactory sessionFactory, - IoTDBRuntimeContext runtimeContext) + IIoTDbSessionFactory sessionFactory) { _logger = logger; _sessionFactory = sessionFactory; - _runtimeContext = runtimeContext; } @@ -87,6 +86,7 @@ namespace JiShe.CollectBus.IoTDB.Provider _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); return; } + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {tablet.First().InsertTargetName}"); await CurrentSession.InsertAsync(tablet.First()); } @@ -124,8 +124,11 @@ namespace JiShe.CollectBus.IoTDB.Provider _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); return; } + foreach (var item in tablet) { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {item.InsertTargetName}"); + await CurrentSession.InsertAsync(item); } } @@ -347,16 +350,16 @@ namespace JiShe.CollectBus.IoTDB.Provider throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体,属于异常情况,-102"); } - if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + if (metadata.EntityType == EntityTypeEnum.TreeModel && UseTableSessionPool == true) { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); } - else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) + else if (metadata.EntityType == EntityTypeEnum.TableModel && UseTableSessionPool == false) { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); + throw new Exception($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); } string tableNameOrTreePath = string.Empty; - if (_runtimeContext.UseTableSessionPool)//表模型 + if ( UseTableSessionPool)//表模型 { //如果指定了路径 if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath)) @@ -417,7 +420,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } } - return _runtimeContext.UseTableSessionPool + return UseTableSessionPool ? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()) : BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()); } @@ -502,7 +505,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var sb = new StringBuilder(); - if (!_runtimeContext.UseTableSessionPool) + if (!UseTableSessionPool) { sb.Append("DELETE "); } diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs index 68cfa62..7a1bc75 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs @@ -37,19 +37,17 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData { private readonly IGuidGenerator _guidGenerator; private readonly IIoTDbProvider _dbProvider; - private readonly ServerApplicationOptions _applicationOptions; - private readonly IoTDBRuntimeContext _runtimeContext; + private readonly ServerApplicationOptions _applicationOptions; private readonly ILogger _logger; private readonly IMemoryCache _imemoryCache; private readonly IFreeRedisProvider _freeRedisProvider; private RedisClient Instance { get; set; } public DataStorage(IIoTDbProvider dbProvider, IOptions applicationOptions, - IGuidGenerator guidGenerator, IoTDBRuntimeContext runtimeContext, ILogger logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider) + IGuidGenerator guidGenerator, ILogger logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider) { _dbProvider= dbProvider; _applicationOptions = applicationOptions.Value; - _guidGenerator= guidGenerator; - _runtimeContext= runtimeContext; + _guidGenerator= guidGenerator; _logger= logger; _imemoryCache = memoryCache; _freeRedisProvider = freeRedisProvider; @@ -159,8 +157,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default) }; - _runtimeContext.UseTableSessionPool = true; // 使用表模型池 - var taskSendInfo = await _dbProvider.QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); + + var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskData = taskSendInfo?.Items.FirstOrDefault(); if (taskData != null) { @@ -204,14 +202,12 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(), ReceivedTime = analysisBaseDto.ReceivedTime, }; - } - _runtimeContext.UseTableSessionPool = true; // 使树模型池 - await _dbProvider.InsertAsync(taskData); + } + await _dbProvider.GetSessionPool(true).InsertAsync(taskData); //如果无字段名,则不保存数据 if (!string.IsNullOrWhiteSpace(data.FiledName)) - { - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(meter); + { + await _dbProvider.GetSessionPool(false).InsertAsync(meter); } return await Task.FromResult(true); } @@ -267,9 +263,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData DataType = analysisBaseDto.DataType, Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整 SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default) - }; - _runtimeContext.UseTableSessionPool = true; // 使用表模型池 - var taskSendInfo = await _dbProvider.QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); + }; + var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskData = taskSendInfo?.Items.FirstOrDefault(); if (taskData != null) { @@ -321,13 +316,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData treeModelSingleMeasuringEntities.Add(meter); } } - // 批量保存数据 - _runtimeContext.UseTableSessionPool = true; // 使树模型池 - await _dbProvider.BatchInsertAsync(meterReadingTelemetryPacketInfos); + // 批量保存数据 + await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos); if (treeModelSingleMeasuringEntities.Count > 0) - { - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.BatchInsertAsync(treeModelSingleMeasuringEntities); + { + await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities); } return await Task.FromResult(true); } @@ -359,9 +352,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, SingleMeasuring = (data.FiledName!, data.DataValue!) - }; - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(treeData); + }; + await _dbProvider.GetSessionPool(false).InsertAsync(treeData); // 数据帧 var treeFrameData = new TreeModelSingleMeasuringEntity() { @@ -373,9 +365,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData Timestamps = timestamps, SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) }; - - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(treeFrameData); + + await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); // 时间 var treeRecordingTimeData = new TreeModelSingleMeasuringEntity() @@ -387,9 +378,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData Timestamps = timestamps, DataType = IOTDBDataTypeConst.Status, SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now) - }; - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(treeRecordingTimeData); + }; + await _dbProvider.GetSessionPool(true).InsertAsync(treeRecordingTimeData); // 新建 string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); @@ -423,9 +413,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ReceivedRemark = data.ErrorCodeMsg ?? string.Empty, ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(), ReceivedTime=analysisBaseDto.ReceivedTime, - }; - _runtimeContext.UseTableSessionPool = true; // 使表模型池 - await _dbProvider.InsertAsync(taskData); + }; + await _dbProvider.GetSessionPool(true).InsertAsync(taskData); return await Task.FromResult(true); } @@ -460,9 +449,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, SingleMeasuring = (item.FiledName!, item.DataValue!) - }; - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(treeData); + }; + await _dbProvider.GetSessionPool(false).InsertAsync(treeData); // 数据帧 var treeFrameData = new TreeModelSingleMeasuringEntity() { @@ -473,9 +461,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData Timestamps = timestamps, SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) }; - - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(treeFrameData); + + await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); // 时间 var treeRecordingTimeData = new TreeModelSingleMeasuringEntity() @@ -486,9 +473,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now) - }; - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(treeRecordingTimeData); + }; + await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData); // 新建 string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); @@ -527,9 +513,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData meterReadingTelemetryPacketInfos.Add(taskData); } if (meterReadingTelemetryPacketInfos.Count > 0) - { - _runtimeContext.UseTableSessionPool = true; // 使表模型池 - await _dbProvider.BatchInsertAsync(meterReadingTelemetryPacketInfos); + { + await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos); } return await Task.FromResult(true); } diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index d73a174..58b5718 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -35,26 +35,22 @@ namespace JiShe.CollectBus.DataChannels private readonly IIoTDbProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; - private readonly ServerApplicationOptions _applicationOptions; - private readonly IoTDBRuntimeContext _runtimeContext; + private readonly ServerApplicationOptions _applicationOptions; private readonly ILogRecordRepository _logRecordRepository; public DataChannelManageService( ILogger logger, - IIoTDbProvider dbProvider, - IoTDBRuntimeContext runtimeContext, + IIoTDbProvider dbProvider, IProducerService producerService, IOptions kafkaOptions, IOptions applicationOptions, ILogRecordRepository logRecordRepository) { _logger = logger; - _dbProvider = dbProvider; - _runtimeContext = runtimeContext; + _dbProvider = dbProvider; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; - _applicationOptions = applicationOptions.Value; - _runtimeContext.UseTableSessionPool = true; + _applicationOptions = applicationOptions.Value; _logRecordRepository= logRecordRepository; } @@ -74,10 +70,11 @@ namespace JiShe.CollectBus.DataChannels { const int BatchSize = 50000; const int EmptyWaitMilliseconds = 50; - var timeout = TimeSpan.FromSeconds(5); + var timeout = TimeSpan.FromMilliseconds(50); var timer = Stopwatch.StartNew(); long timeoutMilliseconds = 0; var metadata = await _dbProvider.GetMetadata(); + var timeoutStopwatch = Stopwatch.StartNew(); try { @@ -98,12 +95,12 @@ namespace JiShe.CollectBus.DataChannels } timer.Restart(); - var startTime = DateTime.Now; + timeoutStopwatch.Restart(); try { // 异步批量读取数据 - while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) + while (batch != null && batch.Count < BatchSize && timeoutStopwatch.Elapsed <= timeout) { try { @@ -122,6 +119,7 @@ namespace JiShe.CollectBus.DataChannels { throw; } + if (batch.Count == 0) { @@ -147,7 +145,7 @@ namespace JiShe.CollectBus.DataChannels try { // 批量写入数据库 - await _dbProvider.BatchInsertAsync(metadata, records); + await _dbProvider.GetSessionPool(true).BatchInsertAsync(metadata, records); // 限流推送Kafka await DeviceGroupBalanceControl.ProcessWithThrottleAsync( @@ -166,9 +164,8 @@ namespace JiShe.CollectBus.DataChannels batch.Clear(); timer.Stop(); + timeoutStopwatch.Stop(); timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; - - startTime = DateTime.Now; } } catch (Exception ex) diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index ad1dc02..197ca1d 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -91,7 +91,6 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS //TableModelSingleMeasuringEntityExtension //TableModelSingleMeasuringEntityAccessor.GetSystemName(meter); //ElectricityMeterAccessor - await _iotDBProvider.GetSessionPool(true).InsertAsync(meter); await _iotDBProvider.InsertAsync(meter); } @@ -115,9 +114,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; - await _iotDBProvider.InsertAsync(meter2); - - _dbContext.UseTableSessionPool = true; + await _iotDBProvider.GetSessionPool(false).InsertAsync(meter2); ElectricityMeter meter = new ElectricityMeter() { @@ -131,7 +128,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS CurrentdDateTime = DateTime.Now, Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; - await _iotDBProvider.InsertAsync(meter); + await _iotDBProvider.GetSessionPool(true).InsertAsync(meter); QueryCondition conditions = new QueryCondition() { @@ -149,7 +146,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS Conditions = new List() { conditions }, }; - var pageResult = await _iotDBProvider.QueryAsync(query); + var pageResult = await _iotDBProvider.GetSessionPool(true).QueryAsync(query); } @@ -176,9 +173,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS }; await _iotDBProvider.InsertAsync(meter2); - - _dbContext.UseTableSessionPool = true; - + ElectricityMeter meter3 = new ElectricityMeter() { diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index f434cdd..5ee3486 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -459,6 +459,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } + ammeter.ItemCodes = "10_97"; + if (!keyValuePairs.ContainsKey(ammeter.FocusAddress)) { keyValuePairs[ammeter.FocusAddress] = new List() { ammeter.Adapt() }; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index bc28d2b..c1fd4e4 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -399,7 +399,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading TripTime = $"{DateTime.Now:HH:mm}", MeterId = 78973, LoopType = "EachDay", - EachDayWithout = "周六,周日", + EachDayWithout = "周六", TimeDensity = 15, }); diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs index 9d22468..d17b921 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs @@ -42,458 +42,458 @@ namespace JiShe.CollectBus.Subscribers _protocolService = protocolService; } - /// - /// 解析AFN00H - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN00HReceivedEventNameTemp)] - public async Task ReceivedAFN00Event(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data==null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN00H + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN00HReceivedEventNameTemp)] + //public async Task ReceivedAFN00Event(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data==null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN01H - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)] - public async Task ReceivedAFN01Event(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN01H + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)] + //public async Task ReceivedAFN01Event(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN02H - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)] - public async Task ReceivedAFN02Event(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN02H + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)] + //public async Task ReceivedAFN02Event(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data, (result) => - { - var ssss = (UnitDataAnalysis>)result; - _logger.LogInformation($"解析AFN02H数据:{ssss.Serialize()}"); - }); - return SubscribeAck.Success(); - } - return SubscribeAck.Fail(); + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data, (result) => + // { + // var ssss = (UnitDataAnalysis>)result; + // _logger.LogInformation($"解析AFN02H数据:{ssss.Serialize()}"); + // }); + // return SubscribeAck.Success(); + // } + // return SubscribeAck.Fail(); - } + //} - /// - /// 解析AFN03H - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN03HReceivedEventNameTemp)] - public async Task ReceivedAFN03Event(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN03H + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN03HReceivedEventNameTemp)] + //public async Task ReceivedAFN03Event(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN04H - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN04HReceivedEventNameTemp)] - public async Task ReceivedAFN04Event(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN04H + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN04HReceivedEventNameTemp)] + //public async Task ReceivedAFN04Event(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN05H - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN05HReceivedEventNameTemp)] - public async Task ReceivedAFN05Event(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN05H + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN05HReceivedEventNameTemp)] + //public async Task ReceivedAFN05Event(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN09H - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN09HReceivedEventNameTemp)] - public async Task ReceivedAFN09Event(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN09H + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN09HReceivedEventNameTemp)] + //public async Task ReceivedAFN09Event(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN0AH - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN0AHReceivedEventNameTemp)] - public async Task ReceivedAFN0AEvent(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN0AH + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN0AHReceivedEventNameTemp)] + //public async Task ReceivedAFN0AEvent(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN0BH - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN0BHReceivedEventNameTemp)] - public async Task ReceivedAFN0BEvent(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN0BH + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN0BHReceivedEventNameTemp)] + //public async Task ReceivedAFN0BEvent(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN0CH - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN0CHReceivedEventNameTemp)] - public async Task ReceivedAFN0CEvent(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN0CH + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN0CHReceivedEventNameTemp)] + //public async Task ReceivedAFN0CEvent(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN0DH - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN0DHReceivedEventNameTemp)] - public async Task ReceivedAFN0DEvent(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN0DH + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN0DHReceivedEventNameTemp)] + //public async Task ReceivedAFN0DEvent(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} - /// - /// 解析AFN0EH - /// - /// - /// - [KafkaSubscribe(ProtocolConst.SubscriberAFN0EHReceivedEventNameTemp)] - public async Task ReceivedAFN0EEvent(MessageProtocolAnalysis receivedMessage) - { - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - if (receivedMessage.Data == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + ///// + ///// 解析AFN0EH + ///// + ///// + ///// + //[KafkaSubscribe(ProtocolConst.SubscriberAFN0EHReceivedEventNameTemp)] + //public async Task ReceivedAFN0EEvent(MessageProtocolAnalysis receivedMessage) + //{ + // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // if (receivedMessage.Data == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + // { + // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + // return SubscribeAck.Success(); + // } + // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + // //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - await executor.ExecuteAsync(serverName, receivedMessage.Data); - return SubscribeAck.Success(); - } + // //var data = await analysisStrategy.ExecuteAsync>(tB3761); + // var executor = _serviceProvider.GetRequiredService(); + // await executor.ExecuteAsync(serverName, receivedMessage.Data); + // return SubscribeAck.Success(); + // } - return SubscribeAck.Fail(); - } + // return SubscribeAck.Fail(); + //} /// diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index bd754bb..ee5f646 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -163,7 +163,7 @@ namespace JiShe.CollectBus.Subscribers // 增加·发送次数和重试开始时间 receivedMessage.SendNum += 1; receivedMessage.NextSendTime = DateTime.Now.AddHours(1); - await _dbProvider.InsertAsync(receivedMessage); + await _dbProvider.GetSessionPool(true).InsertAsync(receivedMessage); // TODO: 第4次的时候会推送到地方预警处理 } // 由于有3次重试机会,故每次消息都会被确认 @@ -254,7 +254,7 @@ namespace JiShe.CollectBus.Subscribers // 增加·发送次数和重试开始时间 receivedMessage.SendNum += 1; receivedMessage.NextSendTime = DateTime.Now.AddHours(1); - await _dbProvider.InsertAsync(receivedMessage); + await _dbProvider.GetSessionPool(true).InsertAsync(receivedMessage); return SubscribeAck.Success(); } diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 7d3fc4a..e6cbafd 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -835,6 +835,11 @@ namespace JiShe.CollectBus.Common.Helpers /// public static bool JudgeIsGenerate_Day(string eachDayWithout, DateTime curTime) { + if (string.IsNullOrWhiteSpace(eachDayWithout)) + { + return false; + } + var weekName = strWeeks[(int)curTime.DayOfWeek]; var arr = eachDayWithout.Split(','); return !arr.Contains(weekName); diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index d7ebf50..64893f2 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -88,7 +88,7 @@ "ClusterList": [ "192.168.5.9:6667" ], "PoolSize": 32, "DataBaseName": "energy", - "OpenDebugMode": false, + "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, "Cassandra": {