优化IoTDB数据驱动

This commit is contained in:
ChenYi 2025-05-18 16:04:23 +08:00
parent 17df6af4f0
commit 921973e5d4
10 changed files with 488 additions and 501 deletions

View File

@ -37,17 +37,18 @@ namespace JiShe.CollectBus.IoTDB.Provider
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new(); private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
private readonly ILogger<IoTDbProvider> _logger; private readonly ILogger<IoTDbProvider> _logger;
private readonly IIoTDbSessionFactory _sessionFactory; private readonly IIoTDbSessionFactory _sessionFactory;
private readonly IoTDBRuntimeContext _runtimeContext;
private IIoTDbSessionPool CurrentSession => /// <summary>
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); /// 存储模型切换标识是否使用table模型存储, 默认为false标识tree模型存储
/// </summary>
public bool UseTableSessionPool { get; set; }
private IIoTDbSessionPool CurrentSession { get; set; }
//private IIoTDbSessionPool CurrentSession { get; set; } public IIoTDbProvider GetSessionPool(bool useTableSessionPool)
public IIoTDbProvider GetSessionPool(bool sessionpolType)
{ {
//CurrentSession = _sessionFactory.GetSessionPool(sessionpolType); CurrentSession = _sessionFactory.GetSessionPool(useTableSessionPool);
UseTableSessionPool = useTableSessionPool;
return this; return this;
} }
@ -60,12 +61,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="runtimeContext"></param> /// <param name="runtimeContext"></param>
public IoTDbProvider( public IoTDbProvider(
ILogger<IoTDbProvider> logger, ILogger<IoTDbProvider> logger,
IIoTDbSessionFactory sessionFactory, IIoTDbSessionFactory sessionFactory)
IoTDBRuntimeContext runtimeContext)
{ {
_logger = logger; _logger = logger;
_sessionFactory = sessionFactory; _sessionFactory = sessionFactory;
_runtimeContext = runtimeContext;
} }
@ -87,6 +86,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return; return;
} }
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {tablet.First().InsertTargetName}");
await CurrentSession.InsertAsync(tablet.First()); await CurrentSession.InsertAsync(tablet.First());
} }
@ -124,8 +124,11 @@ namespace JiShe.CollectBus.IoTDB.Provider
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return; return;
} }
foreach (var item in tablet) foreach (var item in tablet)
{ {
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {item.InsertTargetName}");
await CurrentSession.InsertAsync(item); await CurrentSession.InsertAsync(item);
} }
} }
@ -347,16 +350,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体属于异常情况-102"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体属于异常情况-102");
} }
if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) if (metadata.EntityType == EntityTypeEnum.TreeModel && UseTableSessionPool == true)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接属于异常情况-103"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接属于异常情况-103");
} }
else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) else if (metadata.EntityType == EntityTypeEnum.TableModel && UseTableSessionPool == false)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104"); throw new Exception($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104");
} }
string tableNameOrTreePath = string.Empty; string tableNameOrTreePath = string.Empty;
if (_runtimeContext.UseTableSessionPool)//表模型 if ( UseTableSessionPool)//表模型
{ {
//如果指定了路径 //如果指定了路径
if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath)) if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath))
@ -417,7 +420,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
} }
return _runtimeContext.UseTableSessionPool return UseTableSessionPool
? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()) ? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList())
: BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()); : BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList());
} }
@ -502,7 +505,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var sb = new StringBuilder(); var sb = new StringBuilder();
if (!_runtimeContext.UseTableSessionPool) if (!UseTableSessionPool)
{ {
sb.Append("DELETE "); sb.Append("DELETE ");
} }

View File

@ -38,18 +38,16 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
private readonly IGuidGenerator _guidGenerator; private readonly IGuidGenerator _guidGenerator;
private readonly IIoTDbProvider _dbProvider; private readonly IIoTDbProvider _dbProvider;
private readonly ServerApplicationOptions _applicationOptions; private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly ILogger<DataStorage> _logger; private readonly ILogger<DataStorage> _logger;
private readonly IMemoryCache _imemoryCache; private readonly IMemoryCache _imemoryCache;
private readonly IFreeRedisProvider _freeRedisProvider; private readonly IFreeRedisProvider _freeRedisProvider;
private RedisClient Instance { get; set; } private RedisClient Instance { get; set; }
public DataStorage(IIoTDbProvider dbProvider, IOptions<ServerApplicationOptions> applicationOptions, public DataStorage(IIoTDbProvider dbProvider, IOptions<ServerApplicationOptions> applicationOptions,
IGuidGenerator guidGenerator, IoTDBRuntimeContext runtimeContext, ILogger<DataStorage> logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider) IGuidGenerator guidGenerator, ILogger<DataStorage> logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider)
{ {
_dbProvider= dbProvider; _dbProvider= dbProvider;
_applicationOptions = applicationOptions.Value; _applicationOptions = applicationOptions.Value;
_guidGenerator= guidGenerator; _guidGenerator= guidGenerator;
_runtimeContext= runtimeContext;
_logger= logger; _logger= logger;
_imemoryCache = memoryCache; _imemoryCache = memoryCache;
_freeRedisProvider = freeRedisProvider; _freeRedisProvider = freeRedisProvider;
@ -159,8 +157,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default) SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default)
}; };
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault(); var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null) if (taskData != null)
{ {
@ -205,13 +203,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ReceivedTime = analysisBaseDto.ReceivedTime, ReceivedTime = analysisBaseDto.ReceivedTime,
}; };
} }
_runtimeContext.UseTableSessionPool = true; // 使树模型池 await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
await _dbProvider.InsertAsync(taskData);
//如果无字段名,则不保存数据 //如果无字段名,则不保存数据
if (!string.IsNullOrWhiteSpace(data.FiledName)) if (!string.IsNullOrWhiteSpace(data.FiledName))
{ {
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(false).InsertAsync(meter);
await _dbProvider.InsertAsync(meter);
} }
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@ -268,8 +264,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整 Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default) SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default)
}; };
_runtimeContext.UseTableSessionPool = true; // 使用表模型池 var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault(); var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null) if (taskData != null)
{ {
@ -322,12 +317,10 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
} }
} }
// 批量保存数据 // 批量保存数据
_runtimeContext.UseTableSessionPool = true; // 使树模型池 await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
await _dbProvider.BatchInsertAsync(meterReadingTelemetryPacketInfos);
if (treeModelSingleMeasuringEntities.Count > 0) if (treeModelSingleMeasuringEntities.Count > 0)
{ {
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
await _dbProvider.BatchInsertAsync(treeModelSingleMeasuringEntities);
} }
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@ -360,8 +353,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = (data.FiledName!, data.DataValue!) SingleMeasuring = (data.FiledName!, data.DataValue!)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
await _dbProvider.InsertAsync(treeData);
// 数据帧 // 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>() var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{ {
@ -374,8 +366,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
await _dbProvider.InsertAsync(treeFrameData);
// 时间 // 时间
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>() var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
@ -388,8 +379,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
DataType = IOTDBDataTypeConst.Status, DataType = IOTDBDataTypeConst.Status,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now) SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(true).InsertAsync(treeRecordingTimeData);
await _dbProvider.InsertAsync(treeRecordingTimeData);
// 新建 // 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
@ -424,8 +414,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(), ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime=analysisBaseDto.ReceivedTime, ReceivedTime=analysisBaseDto.ReceivedTime,
}; };
_runtimeContext.UseTableSessionPool = true; // 使表模型池 await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
await _dbProvider.InsertAsync(taskData);
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@ -461,8 +450,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = (item.FiledName!, item.DataValue!) SingleMeasuring = (item.FiledName!, item.DataValue!)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
await _dbProvider.InsertAsync(treeData);
// 数据帧 // 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>() var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{ {
@ -474,8 +462,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
await _dbProvider.InsertAsync(treeFrameData);
// 时间 // 时间
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>() var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
@ -487,8 +474,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now) SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
await _dbProvider.InsertAsync(treeRecordingTimeData);
// 新建 // 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
@ -528,8 +514,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
} }
if (meterReadingTelemetryPacketInfos.Count > 0) if (meterReadingTelemetryPacketInfos.Count > 0)
{ {
_runtimeContext.UseTableSessionPool = true; // 使表模型池 await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
await _dbProvider.BatchInsertAsync(meterReadingTelemetryPacketInfos);
} }
return await Task.FromResult(true); return await Task.FromResult(true);
} }

View File

@ -36,13 +36,11 @@ namespace JiShe.CollectBus.DataChannels
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
private readonly KafkaOptionConfig _kafkaOptions; private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions; private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly ILogRecordRepository _logRecordRepository; private readonly ILogRecordRepository _logRecordRepository;
public DataChannelManageService( public DataChannelManageService(
ILogger<DataChannelManageService> logger, ILogger<DataChannelManageService> logger,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IProducerService producerService, IProducerService producerService,
IOptions<KafkaOptionConfig> kafkaOptions, IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions, IOptions<ServerApplicationOptions> applicationOptions,
@ -50,11 +48,9 @@ namespace JiShe.CollectBus.DataChannels
{ {
_logger = logger; _logger = logger;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_runtimeContext = runtimeContext;
_producerService = producerService; _producerService = producerService;
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value; _applicationOptions = applicationOptions.Value;
_runtimeContext.UseTableSessionPool = true;
_logRecordRepository= logRecordRepository; _logRecordRepository= logRecordRepository;
} }
@ -74,10 +70,11 @@ namespace JiShe.CollectBus.DataChannels
{ {
const int BatchSize = 50000; const int BatchSize = 50000;
const int EmptyWaitMilliseconds = 50; const int EmptyWaitMilliseconds = 50;
var timeout = TimeSpan.FromSeconds(5); var timeout = TimeSpan.FromMilliseconds(50);
var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
long timeoutMilliseconds = 0; long timeoutMilliseconds = 0;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>(); var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
var timeoutStopwatch = Stopwatch.StartNew();
try try
{ {
@ -98,12 +95,12 @@ namespace JiShe.CollectBus.DataChannels
} }
timer.Restart(); timer.Restart();
var startTime = DateTime.Now; timeoutStopwatch.Restart();
try try
{ {
// 异步批量读取数据 // 异步批量读取数据
while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) while (batch != null && batch.Count < BatchSize && timeoutStopwatch.Elapsed <= timeout)
{ {
try try
{ {
@ -123,6 +120,7 @@ namespace JiShe.CollectBus.DataChannels
throw; throw;
} }
if (batch.Count == 0) if (batch.Count == 0)
{ {
await Task.Delay(EmptyWaitMilliseconds); await Task.Delay(EmptyWaitMilliseconds);
@ -147,7 +145,7 @@ namespace JiShe.CollectBus.DataChannels
try try
{ {
// 批量写入数据库 // 批量写入数据库
await _dbProvider.BatchInsertAsync(metadata, records); await _dbProvider.GetSessionPool(true).BatchInsertAsync(metadata, records);
// 限流推送Kafka // 限流推送Kafka
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
@ -166,9 +164,8 @@ namespace JiShe.CollectBus.DataChannels
batch.Clear(); batch.Clear();
timer.Stop(); timer.Stop();
timeoutStopwatch.Stop();
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
startTime = DateTime.Now;
} }
} }
catch (Exception ex) catch (Exception ex)

View File

@ -91,7 +91,6 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
//TableModelSingleMeasuringEntityExtension //TableModelSingleMeasuringEntityExtension
//TableModelSingleMeasuringEntityAccessor.GetSystemName(meter); //TableModelSingleMeasuringEntityAccessor.GetSystemName(meter);
//ElectricityMeterAccessor //ElectricityMeterAccessor
await _iotDBProvider.GetSessionPool(true).InsertAsync(meter);
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
@ -115,9 +114,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
}; };
await _iotDBProvider.InsertAsync(meter2); await _iotDBProvider.GetSessionPool(false).InsertAsync(meter2);
_dbContext.UseTableSessionPool = true;
ElectricityMeter meter = new ElectricityMeter() ElectricityMeter meter = new ElectricityMeter()
{ {
@ -131,7 +128,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
CurrentdDateTime = DateTime.Now, CurrentdDateTime = DateTime.Now,
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.GetSessionPool(true).InsertAsync(meter);
QueryCondition conditions = new QueryCondition() QueryCondition conditions = new QueryCondition()
{ {
@ -149,7 +146,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
Conditions = new List<QueryCondition>() { conditions }, Conditions = new List<QueryCondition>() { conditions },
}; };
var pageResult = await _iotDBProvider.QueryAsync<ElectricityMeter>(query); var pageResult = await _iotDBProvider.GetSessionPool(true).QueryAsync<ElectricityMeter>(query);
} }
@ -177,8 +174,6 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await _iotDBProvider.InsertAsync(meter2); await _iotDBProvider.InsertAsync(meter2);
_dbContext.UseTableSessionPool = true;
ElectricityMeter meter3 = new ElectricityMeter() ElectricityMeter meter3 = new ElectricityMeter()
{ {

View File

@ -459,6 +459,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
ammeter.ItemCodes = "10_97";
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress)) if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{ {
keyValuePairs[ammeter.FocusAddress] = new List<DeviceInfo>() { ammeter.Adapt<DeviceInfo>() }; keyValuePairs[ammeter.FocusAddress] = new List<DeviceInfo>() { ammeter.Adapt<DeviceInfo>() };

View File

@ -399,7 +399,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TripTime = $"{DateTime.Now:HH:mm}", TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 78973, MeterId = 78973,
LoopType = "EachDay", LoopType = "EachDay",
EachDayWithout = "周六,周日", EachDayWithout = "周六",
TimeDensity = 15, TimeDensity = 15,
}); });

View File

@ -42,458 +42,458 @@ namespace JiShe.CollectBus.Subscribers
_protocolService = protocolService; _protocolService = protocolService;
} }
/// <summary> ///// <summary>
/// 解析AFN00H ///// 解析AFN00H
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN00HReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN00HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN00Event(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN00Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data==null) // if (receivedMessage.Data==null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN01H ///// 解析AFN01H
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN01Event(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN01Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN02H ///// 解析AFN02H
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN02Event(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN02Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data, (result) => // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data, (result) =>
{ // {
var ssss = (UnitDataAnalysis<AnalysisBaseDto<string>>)result; // var ssss = (UnitDataAnalysis<AnalysisBaseDto<string>>)result;
_logger.LogInformation($"解析AFN02H数据{ssss.Serialize()}"); // _logger.LogInformation($"解析AFN02H数据{ssss.Serialize()}");
}); // });
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN03H ///// 解析AFN03H
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN03HReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN03HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN03Event(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN03Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN04H ///// 解析AFN04H
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN04HReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN04HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN04Event(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN04Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN05H ///// 解析AFN05H
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN05HReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN05HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN05Event(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN05Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN09H ///// 解析AFN09H
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN09HReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN09HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN09Event(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN09Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN0AH ///// 解析AFN0AH
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN0AHReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN0AHReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN0AEvent(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN0AEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN0BH ///// 解析AFN0BH
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN0BHReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN0BHReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN0BEvent(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN0BEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN0CH ///// 解析AFN0CH
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN0CHReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN0CHReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN0CEvent(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN0CEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN0DH ///// 解析AFN0DH
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN0DHReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN0DHReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN0DEvent(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN0DEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> ///// <summary>
/// 解析AFN0EH ///// 解析AFN0EH
/// </summary> ///// </summary>
/// <param name="receivedMessage"></param> ///// <param name="receivedMessage"></param>
/// <returns></returns> ///// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN0EHReceivedEventNameTemp)] //[KafkaSubscribe(ProtocolConst.SubscriberAFN0EHReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN0EEvent(MessageProtocolAnalysis<TB3761> receivedMessage) //public async Task<ISubscribeAck> ReceivedAFN0EEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{ //{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); // var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null) // if (protocolPlugin == null)
{ // {
_logger.LogError("协议不存在!"); // _logger.LogError("协议不存在!");
} // }
else // else
{ // {
if (receivedMessage.Data == null) // if (receivedMessage.Data == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) // if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{ // {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; // string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis"); // //var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761); // //var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>(); // var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data); // await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success(); // return SubscribeAck.Success();
} // }
return SubscribeAck.Fail(); // return SubscribeAck.Fail();
} //}
/// <summary> /// <summary>

View File

@ -163,7 +163,7 @@ namespace JiShe.CollectBus.Subscribers
// 增加·发送次数和重试开始时间 // 增加·发送次数和重试开始时间
receivedMessage.SendNum += 1; receivedMessage.SendNum += 1;
receivedMessage.NextSendTime = DateTime.Now.AddHours(1); receivedMessage.NextSendTime = DateTime.Now.AddHours(1);
await _dbProvider.InsertAsync(receivedMessage); await _dbProvider.GetSessionPool(true).InsertAsync(receivedMessage);
// TODO 第4次的时候会推送到地方预警处理 // TODO 第4次的时候会推送到地方预警处理
} }
// 由于有3次重试机会故每次消息都会被确认 // 由于有3次重试机会故每次消息都会被确认
@ -254,7 +254,7 @@ namespace JiShe.CollectBus.Subscribers
// 增加·发送次数和重试开始时间 // 增加·发送次数和重试开始时间
receivedMessage.SendNum += 1; receivedMessage.SendNum += 1;
receivedMessage.NextSendTime = DateTime.Now.AddHours(1); receivedMessage.NextSendTime = DateTime.Now.AddHours(1);
await _dbProvider.InsertAsync(receivedMessage); await _dbProvider.GetSessionPool(true).InsertAsync(receivedMessage);
return SubscribeAck.Success(); return SubscribeAck.Success();
} }

View File

@ -835,6 +835,11 @@ namespace JiShe.CollectBus.Common.Helpers
/// <returns></returns> /// <returns></returns>
public static bool JudgeIsGenerate_Day(string eachDayWithout, DateTime curTime) public static bool JudgeIsGenerate_Day(string eachDayWithout, DateTime curTime)
{ {
if (string.IsNullOrWhiteSpace(eachDayWithout))
{
return false;
}
var weekName = strWeeks[(int)curTime.DayOfWeek]; var weekName = strWeeks[(int)curTime.DayOfWeek];
var arr = eachDayWithout.Split(','); var arr = eachDayWithout.Split(',');
return !arr.Contains(weekName); return !arr.Contains(weekName);

View File

@ -88,7 +88,7 @@
"ClusterList": [ "192.168.5.9:6667" ], "ClusterList": [ "192.168.5.9:6667" ],
"PoolSize": 32, "PoolSize": 32,
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": false, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"Cassandra": { "Cassandra": {