using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Encrypt; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto; using JiShe.CollectBus.Protocol.Dto; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using Volo.Abp.Guids; namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData { public class DataStorage:ITransientDependency { private readonly IGuidGenerator _guidGenerator; private readonly IIoTDbProvider _dbProvider; private readonly ServerApplicationOptions _applicationOptions; private readonly IoTDBRuntimeContext _runtimeContext; public DataStorage(IIoTDbProvider dbProvider, IOptions applicationOptions, IGuidGenerator guidGenerator, IoTDBRuntimeContext runtimeContext) { _dbProvider= dbProvider; _applicationOptions = applicationOptions.Value; _guidGenerator= guidGenerator; _runtimeContext= runtimeContext; } /// /// 获取缓存电表信息 /// /// /// /// public async Task GetMeterInfoAsync(string meterType,string timeDensity="15") { var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}"; return await Task.FromResult(new AmmeterInfo()); } /// /// 保存数据到IotDb /// /// /// /// public async Task SaveDataToIotDbAsync(UnitDataAnalysis> analysisBaseDto) { var data = analysisBaseDto.Data!; if(!data.TimeSpan.HasValue) data.TimeSpan= analysisBaseDto.ReceivedTime; string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(); var conditions = new List(); conditions.Add(new QueryCondition() { Field = "ScoreValue", Operator = "=", IsNumber = false, Value = scoreValue }); conditions.Add(new QueryCondition() { Field = "TaskMark", Operator = "=", IsNumber = false, Value = taskMark }); conditions.Add(new QueryCondition() { Field = "IsReceived", Operator = "=", IsNumber = false, Value = false }); var meter = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{data.MeterId}", DeviceType = $"{data.MeterType}", ProjectId = $"{data.ProjectId}", Timestamps = data.TimeSpan!.Value.GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整 SingleMeasuring = new Tuple(data.FiledName ?? string.Empty, data.DataValue ?? default) }; _runtimeContext.UseTableSessionPool = true; // 使用表模型池 var taskSendInfo = await _dbProvider.QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskData = taskSendInfo?.Items.FirstOrDefault(); if (taskData != null) { // 更新 meter.Timestamps = taskData.Timestamps; taskData.IsReceived=true; taskData.ReceivedMessageHexString= analysisBaseDto.HexMessage; taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty; } else { // 新建 var currentTime = DateTime.Now; taskData = new MeterReadingTelemetryPacketInfo() { SystemName = _applicationOptions.SystemType, ProjectId = $"{data.ProjectId}", DeviceType = $"{data.MeterType}", DeviceId = $"{data.MeterId}", Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(), DatabaseBusiID = data.DatabaseBusiID, PendingCopyReadTime = data.TimeSpan.Value.CheckTimePoint(), // // TODO:这里暂时格式化15分钟数据,需要进行调整 CreationTime = currentTime, MeterAddress = data.MeterAddress, AFN = analysisBaseDto.AFN, Fn = analysisBaseDto.Fn, Seq = analysisBaseDto.PSEQ, MSA = analysisBaseDto.MSA, ItemCode = data.DataType, TaskMark = taskMark, IsSend = false, ManualOrNot = false, Pn = analysisBaseDto.Pn, ReceivedMessageId = analysisBaseDto.MessageId?? string.Empty, ReceivedMessageHexString = analysisBaseDto.HexMessage, IsReceived = true, ReceivedRemark = data.ErrorCodeMsg ?? string.Empty, ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(), }; } await _dbProvider.InsertAsync(taskData); //如果无字段名,则不保存数据 if (!string.IsNullOrWhiteSpace(data.FiledName)) { _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(meter); } return await Task.FromResult(true); } /// /// 保存数据到IotDb /// /// /// /// public async Task SaveMultipleDataToIotDbAsync(UnitDataAnalysis>> analysisBaseDto) { var data = analysisBaseDto.Data!; foreach (var item in data) { if(!item.TimeSpan.HasValue) item.TimeSpan = analysisBaseDto.ReceivedTime; string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(); var conditions = new List(); conditions.Add(new QueryCondition() { Field = "ScoreValue", Operator = "=", IsNumber = false, Value = scoreValue }); conditions.Add(new QueryCondition() { Field = "TaskMark", Operator = "=", IsNumber = false, Value = taskMark }); conditions.Add(new QueryCondition() { Field = "IsReceived", Operator = "=", IsNumber = false, Value = false }); var meter = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{item.MeterId}", DeviceType = $"{item.MeterType}", ProjectId = $"{item.ProjectId}", Timestamps = item.TimeSpan!.Value.CheckTimePoint().GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整 SingleMeasuring = new Tuple(item.FiledName ?? string.Empty, item.DataValue ?? default) }; _runtimeContext.UseTableSessionPool = true; // 使用表模型池 var taskSendInfo = await _dbProvider.QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskData = taskSendInfo?.Items.FirstOrDefault(); if (taskData != null) { // 更新 meter.Timestamps = taskData.Timestamps; taskData.IsReceived = true; taskData.ReceivedMessageHexString = analysisBaseDto.HexMessage; taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty; } else { // 新建 var currentTime = DateTime.Now; taskData = new MeterReadingTelemetryPacketInfo() { SystemName = _applicationOptions.SystemType, ProjectId = $"{item.ProjectId}", DeviceType = $"{item.MeterType}", DeviceId = $"{item.MeterId}", Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(), DatabaseBusiID = item.DatabaseBusiID, PendingCopyReadTime = item.TimeSpan.Value.CheckTimePoint(), // TODO:这里暂时格式化15分钟数据,需要进行调整 CreationTime = currentTime, MeterAddress = item.MeterAddress, AFN = analysisBaseDto.AFN, Fn = analysisBaseDto.Fn, Seq = analysisBaseDto.PSEQ, MSA = analysisBaseDto.MSA, ItemCode = item.DataType, TaskMark = taskMark, IsSend = false, ManualOrNot = false, Pn = analysisBaseDto.Pn, ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty, ReceivedMessageHexString = analysisBaseDto.HexMessage, IsReceived = true, ReceivedRemark = item.ErrorCodeMsg ?? string.Empty, ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(), }; } await _dbProvider.InsertAsync(taskData); //如果无字段名,则不保存数据 if (!string.IsNullOrWhiteSpace(item.FiledName)) { _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(meter); } } return await Task.FromResult(true); } } }