增加是否迁移标识和分库标识

This commit is contained in:
zenghongyao 2025-05-21 14:08:26 +08:00
parent 2ead6e8242
commit 2d2bb0dcc0
2 changed files with 81 additions and 7 deletions

View File

@ -1,4 +1,5 @@
using FreeRedis;
using Confluent.Kafka;
using FreeRedis;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt;
@ -29,6 +30,7 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
using YamlDotNet.Core.Tokens;
using static FreeSql.Internal.GlobalFilter;
using static IClientRPCService;
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
@ -211,6 +213,31 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(data.FiledName))
{
await _dbProvider.GetSessionPool(false).InsertAsync(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID);
}
return await Task.FromResult(true);
}
@ -228,6 +255,9 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
var data = analysisBaseDto.Data!;
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
List<TreeModelSingleMeasuringEntity<bool>> meterIsSyncs = new List<TreeModelSingleMeasuringEntity<bool>>();
List<TreeModelSingleMeasuringEntity<int>> meterDataBaseIDs = new List<TreeModelSingleMeasuringEntity<int>>();
foreach (var item in data)
{
if(!item.TimeSpan.HasValue)
@ -320,6 +350,35 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0)
{
treeModelSingleMeasuringEntities.Add(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
if(!meterIsSyncs.Any(a=> a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps))
meterIsSyncs.Add(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID)
};
if (!meterDataBaseIDs.Any(a => a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps))
meterDataBaseIDs.Add(meterIsDatabaseBusiID);
}
}
// 批量保存数据
@ -327,6 +386,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (treeModelSingleMeasuringEntities.Count > 0)
{
await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
// 报存标识字段
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs);
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs);
}
return await Task.FromResult(true);
}
@ -371,7 +435,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -384,7 +448,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
DataType = IOTDBDataTypeConst.Status,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
}
@ -472,7 +536,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -485,7 +549,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);

View File

@ -170,9 +170,9 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 集中器状态字段
/// IotDB存储字段字段
/// </summary>
public class ConcentratorStatusFieldConst
public class IotDbFieldConst
{
/// <summary>
@ -185,6 +185,16 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary>
public const string FrameData = "FrameData";
/// <summary>
/// 是否同步
/// </summary>
public const string IsSync = "IsSync";
/// <summary>
/// 数据库业务ID
/// </summary>
public const string DatabaseBusiID= "DatabaseBusiID";
}
#endregion