diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs index a997274..ea11af8 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs @@ -1,4 +1,5 @@ -using FreeRedis; +using Confluent.Kafka; +using FreeRedis; using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Encrypt; @@ -29,6 +30,7 @@ using Volo.Abp.DependencyInjection; using Volo.Abp.Guids; using YamlDotNet.Core.Tokens; using static FreeSql.Internal.GlobalFilter; +using static IClientRPCService; using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst; namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData @@ -211,6 +213,31 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData if (!string.IsNullOrWhiteSpace(data.FiledName)) { await _dbProvider.GetSessionPool(false).InsertAsync(meter); + // 增加标识字段 + var meterIsSync = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{data.DeviceId}", + DeviceType = $"{data.DeviceType}", + ProjectId = $"{data.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.IsSync, false) + }; + await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync); + + // 数据库业务ID + var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{data.DeviceId}", + DeviceType = $"{data.DeviceType}", + ProjectId = $"{data.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID) + }; + await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID); } return await Task.FromResult(true); } @@ -228,6 +255,9 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData var data = analysisBaseDto.Data!; List meterReadingTelemetryPacketInfos = new List(); List> treeModelSingleMeasuringEntities = new List>(); + List> meterIsSyncs = new List>(); + List> meterDataBaseIDs = new List>(); + foreach (var item in data) { if(!item.TimeSpan.HasValue) @@ -320,6 +350,35 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0) { treeModelSingleMeasuringEntities.Add(meter); + + // 增加标识字段 + var meterIsSync = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{item.DeviceId}", + DeviceType = $"{item.DeviceType}", + ProjectId = $"{item.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.IsSync, false) + }; + if(!meterIsSyncs.Any(a=> a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps)) + meterIsSyncs.Add(meterIsSync); + + // 数据库业务ID + var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{item.DeviceId}", + DeviceType = $"{item.DeviceType}", + ProjectId = $"{item.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID) + }; + if (!meterDataBaseIDs.Any(a => a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps)) + meterDataBaseIDs.Add(meterIsDatabaseBusiID); + } } // 批量保存数据 @@ -327,6 +386,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData if (treeModelSingleMeasuringEntities.Count > 0) { await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities); + + // 报存标识字段 + await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs); + await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs); + } return await Task.FromResult(true); } @@ -371,7 +435,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{data.ProjectId}", DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, - SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) + SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); @@ -384,7 +448,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{data.ProjectId}", Timestamps = timestamps, DataType = IOTDBDataTypeConst.Status, - SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now) + SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData); } @@ -472,7 +536,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{item.ProjectId}", DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, - SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) + SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); @@ -485,7 +549,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{item.ProjectId}", DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, - SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now) + SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData); diff --git a/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs b/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs index 69e33e5..b556524 100644 --- a/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs @@ -170,9 +170,9 @@ namespace JiShe.CollectBus.Common.Consts /// - /// 集中器状态字段 + /// IotDB存储字段字段 /// - public class ConcentratorStatusFieldConst + public class IotDbFieldConst { /// @@ -185,6 +185,16 @@ namespace JiShe.CollectBus.Common.Consts /// public const string FrameData = "FrameData"; + /// + /// 是否同步 + /// + public const string IsSync = "IsSync"; + + /// + /// 数据库业务ID + /// + public const string DatabaseBusiID= "DatabaseBusiID"; + } #endregion