using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Encrypt; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto; using JiShe.CollectBus.Protocol.Dto; using Microsoft.Extensions.Options; using System.Diagnostics.Metrics; using Volo.Abp.DependencyInjection; using Volo.Abp.Guids; using static FreeSql.Internal.GlobalFilter; using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst; namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData { public class DataStorage:ITransientDependency { private readonly IGuidGenerator _guidGenerator; private readonly IIoTDbProvider _dbProvider; private readonly ServerApplicationOptions _applicationOptions; private readonly IoTDBRuntimeContext _runtimeContext; public DataStorage(IIoTDbProvider dbProvider, IOptions applicationOptions, IGuidGenerator guidGenerator, IoTDBRuntimeContext runtimeContext) { _dbProvider= dbProvider; _applicationOptions = applicationOptions.Value; _guidGenerator= guidGenerator; _runtimeContext= runtimeContext; } /// /// 获取缓存电表信息 /// /// /// /// public async Task GetMeterInfoAsync(string meterType,string timeDensity="15") { var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}"; // TODO:临时写死,等确认后如何取再调整 return await Task.FromResult(new AmmeterInfo() { ProjectID = 10000, MeterId=11111, FocusId=22222 }); } /// /// 保存数据到IotDb /// /// /// /// public async Task SaveDataToIotDbAsync(UnitDataAnalysis> analysisBaseDto) { var data = analysisBaseDto.Data!; if(!data.TimeSpan.HasValue) data.TimeSpan= analysisBaseDto.ReceivedTime; string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(); var conditions = new List(); conditions.Add(new QueryCondition() { Field = "ScoreValue", Operator = "=", IsNumber = false, Value = scoreValue }); conditions.Add(new QueryCondition() { Field = "TaskMark", Operator = "=", IsNumber = false, Value = taskMark }); conditions.Add(new QueryCondition() { Field = "IsReceived", Operator = "=", IsNumber = false, Value = false }); var meter = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{data.DeviceId}", DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}", ProjectId = $"{data.ProjectId}", Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), SingleMeasuring = new Tuple(data.FiledName ?? string.Empty, data.DataValue ?? default) }; _runtimeContext.UseTableSessionPool = true; // 使用表模型池 var taskSendInfo = await _dbProvider.QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskData = taskSendInfo?.Items.FirstOrDefault(); if (taskData != null) { // 更新 meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds(); taskData.IsReceived=true; taskData.ReceivedMessageHexString= analysisBaseDto.HexMessage; taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty; } else { // 新建 var currentTime = DateTime.Now; taskData = new MeterReadingTelemetryPacketInfo() { SystemName = _applicationOptions.SystemType, ProjectId = $"{data.ProjectId}", DeviceType = $"{data.DeviceType}", DeviceId = $"{data.DeviceId}", Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(), DatabaseBusiID = data.DatabaseBusiID, PendingCopyReadTime = data.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity), CreationTime = currentTime, MeterAddress = data.MeterAddress, AFN = analysisBaseDto.AFN, Fn = analysisBaseDto.Fn, Seq = analysisBaseDto.PSEQ, MSA = analysisBaseDto.MSA, ItemCode = data.DataType, TaskMark = taskMark, IsSend = false, ManualOrNot = false, Pn = analysisBaseDto.Pn, ReceivedMessageId = analysisBaseDto.MessageId?? string.Empty, ReceivedMessageHexString = analysisBaseDto.HexMessage, IsReceived = true, ReceivedRemark = data.ErrorCodeMsg ?? string.Empty, ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(), }; } await _dbProvider.InsertAsync(taskData); //如果无字段名,则不保存数据 if (!string.IsNullOrWhiteSpace(data.FiledName)) { _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(meter); } return await Task.FromResult(true); } /// /// 保存数据到IotDb /// /// /// /// public async Task SaveMultipleDataToIotDbAsync(UnitDataAnalysis>> analysisBaseDto) { var data = analysisBaseDto.Data!; List meterReadingTelemetryPacketInfos = new List(); List< TreeModelSingleMeasuringEntity> treeModelSingleMeasuringEntities = new List>(); foreach (var item in data) { if(!item.TimeSpan.HasValue) item.TimeSpan = analysisBaseDto.ReceivedTime; string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ); string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(); var conditions = new List(); conditions.Add(new QueryCondition() { Field = "ScoreValue", Operator = "=", IsNumber = false, Value = scoreValue }); conditions.Add(new QueryCondition() { Field = "TaskMark", Operator = "=", IsNumber = false, Value = taskMark }); conditions.Add(new QueryCondition() { Field = "IsReceived", Operator = "=", IsNumber = false, Value = false }); var meter = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{item.DeviceId}", DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}", ProjectId = $"{item.ProjectId}", Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整 SingleMeasuring = new Tuple(item.FiledName ?? string.Empty, item.DataValue ?? default) }; _runtimeContext.UseTableSessionPool = true; // 使用表模型池 var taskSendInfo = await _dbProvider.QueryAsync(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskData = taskSendInfo?.Items.FirstOrDefault(); if (taskData != null) { // 更新 meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds(); taskData.IsReceived = true; taskData.ReceivedMessageHexString = analysisBaseDto.HexMessage; taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty; } else { // 新建 var currentTime = DateTime.Now; taskData = new MeterReadingTelemetryPacketInfo() { SystemName = _applicationOptions.SystemType, ProjectId = $"{item.ProjectId}", DeviceType = $"{item.DeviceType}", DeviceId = $"{item.DeviceId}", Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(), DatabaseBusiID = item.DatabaseBusiID, PendingCopyReadTime = item.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity), CreationTime = currentTime, MeterAddress = item.MeterAddress, AFN = analysisBaseDto.AFN, Fn = analysisBaseDto.Fn, Seq = analysisBaseDto.PSEQ, MSA = analysisBaseDto.MSA, ItemCode = item.DataType, TaskMark = taskMark, IsSend = false, ManualOrNot = false, Pn = analysisBaseDto.Pn, ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty, ReceivedMessageHexString = analysisBaseDto.HexMessage, IsReceived = true, ReceivedRemark = item.ErrorCodeMsg ?? string.Empty, ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(), }; } meterReadingTelemetryPacketInfos.Add(taskData); //如果无字段名,则不保存数据 if (!string.IsNullOrWhiteSpace(item.FiledName)) { treeModelSingleMeasuringEntities.Add(meter); } } // 批量保存数据 await _dbProvider.BatchInsertAsync(meterReadingTelemetryPacketInfos); if (treeModelSingleMeasuringEntities.Count > 0) { _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.BatchInsertAsync(treeModelSingleMeasuringEntities); } return await Task.FromResult(true); } /// /// 保存状态到IotDb /// /// /// /// public async Task SaveStatusToIotDbAsync(UnitDataAnalysis> analysisBaseDto) { ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data)); ArgumentNullException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.FiledName)); ArgumentNullException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.DataValue)); var data = analysisBaseDto.Data!; if (!data.TimeSpan.HasValue) data.TimeSpan = analysisBaseDto.ReceivedTime; // 类型(心跳,登录,上电,掉电) long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(); var treeData = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{data.DeviceId}", DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", ProjectId = $"{data.ProjectId}", Timestamps = timestamps, SingleMeasuring = new Tuple(data.FiledName!, data.DataValue!) }; _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(treeData); // 数据帧 var treeFrameData = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{data.DeviceId}", DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", ProjectId = $"{data.ProjectId}", Timestamps = timestamps, SingleMeasuring = new Tuple(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty) }; _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(treeFrameData); // 时间 var treeRecordingTimeData = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{data.DeviceId}", DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", ProjectId = $"{data.ProjectId}", Timestamps = timestamps, SingleMeasuring = new Tuple(ConcentratorStatusFieldConst.RecordingTime, (data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now).GetDateTimeOffset().ToUnixTimeNanoseconds()) }; _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(treeRecordingTimeData); // 备注 var treeRemarkData = new TreeModelSingleMeasuringEntity() { SystemName = _applicationOptions.SystemType, DeviceId = $"{data.DeviceId}", DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", ProjectId = $"{data.ProjectId}", Timestamps = timestamps, SingleMeasuring = new Tuple(ConcentratorStatusFieldConst.Remark, data.FiledDesc ?? string.Empty) }; _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(treeRemarkData); return await Task.FromResult(true); } } }