607 lines
31 KiB
C#
607 lines
31 KiB
C#
using Confluent.Kafka;
|
||
using FreeRedis;
|
||
using JiShe.CollectBus.Common;
|
||
using JiShe.CollectBus.Common.Consts;
|
||
using JiShe.CollectBus.Common.Encrypt;
|
||
using JiShe.CollectBus.Common.Enums;
|
||
using JiShe.CollectBus.Common.Extensions;
|
||
using JiShe.CollectBus.Common.Helpers;
|
||
using JiShe.CollectBus.FreeRedis;
|
||
using JiShe.CollectBus.IoTDB.Context;
|
||
using JiShe.CollectBus.IoTDB.Interface;
|
||
using JiShe.CollectBus.IoTDB.Model;
|
||
using JiShe.CollectBus.IoTDB.Options;
|
||
using JiShe.CollectBus.IoTDB.Provider;
|
||
using JiShe.CollectBus.IotSystems.Ammeters;
|
||
using JiShe.CollectBus.IotSystems.Devices;
|
||
using JiShe.CollectBus.IotSystems.LogRecord;
|
||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
|
||
using JiShe.CollectBus.Protocol.Dto;
|
||
using Microsoft.AspNetCore.Http;
|
||
using Microsoft.Extensions.Caching.Memory;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
using System.Diagnostics;
|
||
using System.Diagnostics.Metrics;
|
||
using System.Threading.Channels;
|
||
using TouchSocket.Core;
|
||
using Volo.Abp.DependencyInjection;
|
||
using Volo.Abp.Guids;
|
||
using YamlDotNet.Core.Tokens;
|
||
using static FreeSql.Internal.GlobalFilter;
|
||
using static IClientRPCService;
|
||
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
|
||
|
||
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
|
||
{
|
||
public class DataStorage:ITransientDependency
|
||
{
|
||
private readonly IGuidGenerator _guidGenerator;
|
||
private readonly IIoTDbProvider _dbProvider;
|
||
private readonly ServerApplicationOptions _applicationOptions;
|
||
private readonly ILogger<DataStorage> _logger;
|
||
private readonly IMemoryCache _imemoryCache;
|
||
private readonly IFreeRedisProvider _freeRedisProvider;
|
||
private RedisClient Instance { get; set; }
|
||
public DataStorage(IIoTDbProvider dbProvider, IOptions<ServerApplicationOptions> applicationOptions,
|
||
IGuidGenerator guidGenerator, ILogger<DataStorage> logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider)
|
||
{
|
||
_dbProvider= dbProvider;
|
||
_applicationOptions = applicationOptions.Value;
|
||
_guidGenerator= guidGenerator;
|
||
_logger= logger;
|
||
_imemoryCache = memoryCache;
|
||
_freeRedisProvider = freeRedisProvider;
|
||
Instance = _freeRedisProvider.Instance;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 日志保存通道写入
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
public async Task LogSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
|
||
{
|
||
await channelWriter.WriteAsync(dataItems);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 日志刷新通道写入
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
public async Task LogRefreshSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
|
||
{
|
||
await channelWriter.WriteAsync(dataItems);
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 获取缓存设备信息
|
||
/// </summary>
|
||
/// <param name="meterType"></param>
|
||
/// <param name="timeDensity"></param>
|
||
/// <returns></returns>
|
||
public async Task<DeviceInfo?> GetDeviceInfoAsync(string code,int pn=1,string meterAddress=null)
|
||
{
|
||
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName)}";
|
||
string deviceKey= $"{code}:{pn}";
|
||
var deviceInfo= await _imemoryCache.GetOrCreateAsync(deviceKey, async entry =>
|
||
{
|
||
List<DeviceInfo> devices= await Instance.HGetAsync<List<DeviceInfo>>(redisCacheDeviceInfoHashKey, code) ?? new List<DeviceInfo>();
|
||
var data = devices.Where(s => s.MeteringCode == pn).FirstOrDefault();
|
||
if (data != null)
|
||
entry.SetSlidingExpiration(TimeSpan.FromMinutes(5));
|
||
else
|
||
entry.SetSlidingExpiration(TimeSpan.FromSeconds(5));
|
||
return data ?? default;
|
||
});
|
||
if (deviceInfo == null && !string.IsNullOrWhiteSpace(meterAddress))
|
||
{
|
||
// TODO:透明转发回来的pn为0情况
|
||
string deviceAddressKey = $"{code}:{meterAddress}";
|
||
deviceInfo = await _imemoryCache.GetOrCreateAsync(deviceAddressKey, async entry =>
|
||
{
|
||
List<DeviceInfo> devices = await Instance.HGetAsync<List<DeviceInfo>>(redisCacheDeviceInfoHashKey, code) ?? new List<DeviceInfo>();
|
||
var data = devices.Where(s => s.MeterAddress == meterAddress).FirstOrDefault();
|
||
if (data != null)
|
||
entry.SetSlidingExpiration(TimeSpan.FromMinutes(5));
|
||
else
|
||
entry.SetSlidingExpiration(TimeSpan.FromSeconds(5));
|
||
return data ?? default;
|
||
});
|
||
}
|
||
return deviceInfo;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 保存数据到IotDb
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="analysisBaseDto"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> SaveDataToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
|
||
{
|
||
var data = analysisBaseDto.Data!;
|
||
if(!data.TimeSpan.HasValue)
|
||
data.TimeSpan= analysisBaseDto.ReceivedTime;
|
||
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
|
||
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
|
||
|
||
var conditions = new List<QueryCondition>();
|
||
conditions.Add(new QueryCondition()
|
||
{
|
||
Field = "ScoreValue",
|
||
Operator = "=",
|
||
IsNumber = false,
|
||
Value = scoreValue
|
||
});
|
||
conditions.Add(new QueryCondition()
|
||
{
|
||
Field = "TaskMark",
|
||
Operator = "=",
|
||
IsNumber = false,
|
||
Value = taskMark
|
||
});
|
||
conditions.Add(new QueryCondition()
|
||
{
|
||
Field = "IsReceived",
|
||
Operator = "=",
|
||
IsNumber = false,
|
||
Value = false
|
||
});
|
||
var meter = new TreeModelSingleMeasuringEntity<T>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{data.DeviceId}",
|
||
DeviceType = $"{data.DeviceType.ToString()}",
|
||
ProjectId = $"{data.ProjectId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||
SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default)
|
||
};
|
||
|
||
var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
|
||
var taskData = taskSendInfo?.Items.FirstOrDefault();
|
||
if (taskData != null)
|
||
{
|
||
// 更新
|
||
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
||
taskData.IsReceived=true;
|
||
taskData.ReceivedMessageHexString= analysisBaseDto.ReceivedHexMessage;
|
||
taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty;
|
||
}
|
||
else
|
||
{
|
||
// 新建
|
||
var currentTime = DateTime.Now;
|
||
//特殊编码映射
|
||
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
|
||
taskData = new MeterReadingTelemetryPacketInfo()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
ProjectId = $"{data.ProjectId}",
|
||
DeviceType = $"{data.DeviceType}",
|
||
DeviceId = $"{data.DeviceId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
FocusId = data.FocusId,
|
||
FocusAddress = analysisBaseDto.Code,
|
||
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||
DatabaseBusiID = data.DatabaseBusiID,
|
||
PendingCopyReadTime = data.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
|
||
CreationTime = currentTime,
|
||
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress, // 判断是否能取到表地址
|
||
AFN = analysisBaseDto.AFN,
|
||
Fn = analysisBaseDto.Fn,
|
||
Seq = analysisBaseDto.PSEQ,
|
||
MSA = analysisBaseDto.MSA,
|
||
ItemCode = itemCodeInfo.Item1,
|
||
SubItemCode = itemCodeInfo.Item2,
|
||
TaskMark = taskMark,
|
||
IsSend = false,
|
||
ManualOrNot = false,
|
||
Pn = analysisBaseDto.Pn,
|
||
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
|
||
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
|
||
IsReceived = true,
|
||
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
|
||
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
|
||
ReceivedTime = analysisBaseDto.ReceivedTime,
|
||
};
|
||
}
|
||
await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
|
||
//如果无字段名,则不保存数据
|
||
if (!string.IsNullOrWhiteSpace(data.FiledName))
|
||
{
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(meter);
|
||
// 增加标识字段
|
||
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{data.DeviceId}",
|
||
DeviceType = $"{data.DeviceType}",
|
||
ProjectId = $"{data.ProjectId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
Timestamps = meter.Timestamps,
|
||
SingleMeasuring = (IotDbFieldConst.IsSync, false)
|
||
};
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync);
|
||
|
||
// 数据库业务ID
|
||
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{data.DeviceId}",
|
||
DeviceType = $"{data.DeviceType}",
|
||
ProjectId = $"{data.ProjectId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
Timestamps = meter.Timestamps,
|
||
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID)
|
||
};
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID);
|
||
}
|
||
return await Task.FromResult(true);
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 保存数据到IotDb
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="analysisBaseDto"></param>
|
||
/// <param name="saveData"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> SaveMultipleDataToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
|
||
{
|
||
var data = analysisBaseDto.Data!;
|
||
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
|
||
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
|
||
List<TreeModelSingleMeasuringEntity<bool>> meterIsSyncs = new List<TreeModelSingleMeasuringEntity<bool>>();
|
||
List<TreeModelSingleMeasuringEntity<int>> meterDataBaseIDs = new List<TreeModelSingleMeasuringEntity<int>>();
|
||
|
||
foreach (var item in data)
|
||
{
|
||
if(!item.TimeSpan.HasValue)
|
||
item.TimeSpan = analysisBaseDto.ReceivedTime;
|
||
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
|
||
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
|
||
|
||
var conditions = new List<QueryCondition>();
|
||
conditions.Add(new QueryCondition()
|
||
{
|
||
Field = "ScoreValue",
|
||
Operator = "=",
|
||
IsNumber = false,
|
||
Value = scoreValue
|
||
});
|
||
conditions.Add(new QueryCondition()
|
||
{
|
||
Field = "TaskMark",
|
||
Operator = "=",
|
||
IsNumber = false,
|
||
Value = taskMark
|
||
});
|
||
conditions.Add(new QueryCondition()
|
||
{
|
||
Field = "IsReceived",
|
||
Operator = "=",
|
||
IsNumber = false,
|
||
Value = false
|
||
});
|
||
var meter = new TreeModelSingleMeasuringEntity<T>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{item.DeviceId}",
|
||
DeviceType = $"{item.DeviceType}",
|
||
ProjectId = $"{item.ProjectId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据,需要进行调整
|
||
SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default)
|
||
};
|
||
var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
|
||
var taskData = taskSendInfo?.Items.FirstOrDefault();
|
||
if (taskData != null)
|
||
{
|
||
// 更新
|
||
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
||
taskData.IsReceived = true;
|
||
taskData.ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage;
|
||
taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty;
|
||
}
|
||
else
|
||
{
|
||
// 新建
|
||
var currentTime = DateTime.Now;
|
||
//特殊编码映射
|
||
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
|
||
taskData = new MeterReadingTelemetryPacketInfo()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
ProjectId = $"{item.ProjectId}",
|
||
DeviceType = $"{item.DeviceType}",
|
||
DeviceId = $"{item.DeviceId}",
|
||
DataType = IOTDBDataTypeConst.Log, // 匹配不到下发记录标记为LOG
|
||
Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||
DatabaseBusiID = item.DatabaseBusiID,
|
||
PendingCopyReadTime = item.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
|
||
CreationTime = currentTime,
|
||
FocusId = item.FocusId,
|
||
FocusAddress = analysisBaseDto.Code,
|
||
MeterAddress = analysisBaseDto.Code == item.DeviceAddress ? "" : item.DeviceAddress, // 判断是否能取到表地址
|
||
AFN = analysisBaseDto.AFN,
|
||
Fn = analysisBaseDto.Fn,
|
||
Seq = analysisBaseDto.PSEQ,
|
||
MSA = analysisBaseDto.MSA,
|
||
ItemCode = itemCodeInfo.Item1,
|
||
SubItemCode = itemCodeInfo.Item2,
|
||
TaskMark = taskMark,
|
||
IsSend = false,
|
||
ManualOrNot = false,
|
||
Pn = analysisBaseDto.Pn,
|
||
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
|
||
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
|
||
IsReceived = true,
|
||
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
|
||
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
|
||
ReceivedTime = analysisBaseDto.ReceivedTime,
|
||
};
|
||
}
|
||
meterReadingTelemetryPacketInfos.Add(taskData);
|
||
//如果无字段名,则不保存数据,如saveData=false 也不保存数据
|
||
if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0)
|
||
{
|
||
treeModelSingleMeasuringEntities.Add(meter);
|
||
|
||
// 增加标识字段
|
||
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{item.DeviceId}",
|
||
DeviceType = $"{item.DeviceType}",
|
||
ProjectId = $"{item.ProjectId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
Timestamps = meter.Timestamps,
|
||
SingleMeasuring = (IotDbFieldConst.IsSync, false)
|
||
};
|
||
if(!meterIsSyncs.Any(a=> a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps))
|
||
meterIsSyncs.Add(meterIsSync);
|
||
|
||
// 数据库业务ID
|
||
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{item.DeviceId}",
|
||
DeviceType = $"{item.DeviceType}",
|
||
ProjectId = $"{item.ProjectId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
Timestamps = meter.Timestamps,
|
||
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID)
|
||
};
|
||
if (!meterDataBaseIDs.Any(a => a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps))
|
||
meterDataBaseIDs.Add(meterIsDatabaseBusiID);
|
||
|
||
}
|
||
}
|
||
// 批量保存数据
|
||
await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
|
||
if (treeModelSingleMeasuringEntities.Count > 0)
|
||
{
|
||
await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
|
||
|
||
// 报存标识字段
|
||
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs);
|
||
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs);
|
||
|
||
}
|
||
return await Task.FromResult(true);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 保存状态到IotDb
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="analysisBaseDto"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> SaveStatusToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
|
||
{
|
||
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
|
||
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.FiledName));
|
||
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.DataValue));
|
||
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
|
||
|
||
var data = analysisBaseDto.Data!;
|
||
if (!data.TimeSpan.HasValue)
|
||
data.TimeSpan = analysisBaseDto.ReceivedTime;
|
||
// 类型(心跳,登录,上电,掉电)
|
||
if (data.ProjectId > 0)
|
||
{
|
||
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
||
var treeData = new TreeModelSingleMeasuringEntity<T>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{data.DeviceId}",
|
||
DeviceType = $"{data.DeviceType}",
|
||
ProjectId = $"{data.ProjectId}",
|
||
DataType = IOTDBDataTypeConst.Status,
|
||
Timestamps = timestamps,
|
||
SingleMeasuring = (data.FiledName!, data.DataValue!)
|
||
};
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
|
||
// 数据帧
|
||
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{data.DeviceId}",
|
||
DeviceType = $"{data.DeviceType}",
|
||
ProjectId = $"{data.ProjectId}",
|
||
DataType = IOTDBDataTypeConst.Status,
|
||
Timestamps = timestamps,
|
||
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
|
||
};
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
|
||
|
||
// 时间
|
||
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{data.DeviceId}",
|
||
DeviceType = $"{data.DeviceType}",
|
||
ProjectId = $"{data.ProjectId}",
|
||
Timestamps = timestamps,
|
||
DataType = IOTDBDataTypeConst.Status,
|
||
SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
|
||
};
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
|
||
}
|
||
|
||
// 新建
|
||
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
|
||
var currentTime = DateTime.Now;
|
||
//特殊编码映射
|
||
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
|
||
var taskData = new MeterReadingTelemetryPacketInfo()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
ProjectId = $"{data.ProjectId}",
|
||
DeviceType = $"{data.DeviceType}",
|
||
DeviceId = $"{data.DeviceId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
FocusId = data.FocusId,
|
||
FocusAddress = analysisBaseDto.Code,
|
||
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||
DatabaseBusiID = data.DatabaseBusiID,
|
||
PendingCopyReadTime = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
|
||
CreationTime = currentTime,
|
||
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress,// 判断是否能取到表地址
|
||
AFN = analysisBaseDto.AFN,
|
||
Fn = analysisBaseDto.Fn,
|
||
Seq = analysisBaseDto.PSEQ,
|
||
MSA = analysisBaseDto.MSA,
|
||
ItemCode = itemCodeInfo.Item1,
|
||
SubItemCode = itemCodeInfo.Item2,
|
||
TaskMark = taskMark,
|
||
IsSend = false,
|
||
ManualOrNot = false,
|
||
Pn = analysisBaseDto.Pn,
|
||
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
|
||
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
|
||
IsReceived = true,
|
||
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
|
||
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
|
||
ReceivedTime=analysisBaseDto.ReceivedTime,
|
||
};
|
||
await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
|
||
|
||
return await Task.FromResult(true);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 保存状态到IotDb
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="analysisBaseDto"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> SaveMultipleStatusToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
|
||
{
|
||
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
|
||
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
|
||
|
||
var data = analysisBaseDto.Data!;
|
||
foreach (var item in data)
|
||
{
|
||
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.FiledName));
|
||
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.DataValue));
|
||
|
||
if (!item.TimeSpan.HasValue)
|
||
item.TimeSpan = analysisBaseDto.ReceivedTime;
|
||
// 类型(心跳,登录,上电,掉电)
|
||
if (item.ProjectId > 0)
|
||
{
|
||
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
||
var treeData = new TreeModelSingleMeasuringEntity<T>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceId = $"{item.DeviceId}",
|
||
DeviceType = $"{item.DeviceType}",
|
||
ProjectId = $"{item.ProjectId}",
|
||
DataType = IOTDBDataTypeConst.Status,
|
||
Timestamps = timestamps,
|
||
SingleMeasuring = (item.FiledName!, item.DataValue!)
|
||
};
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
|
||
// 数据帧
|
||
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceType = $"{item.DeviceType}",
|
||
ProjectId = $"{item.ProjectId}",
|
||
DataType = IOTDBDataTypeConst.Status,
|
||
Timestamps = timestamps,
|
||
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
|
||
};
|
||
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
|
||
|
||
// 时间
|
||
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
DeviceType = $"{item.DeviceType}",
|
||
ProjectId = $"{item.ProjectId}",
|
||
DataType = IOTDBDataTypeConst.Status,
|
||
Timestamps = timestamps,
|
||
SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
|
||
};
|
||
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
|
||
|
||
}
|
||
|
||
// 新建
|
||
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
|
||
var currentTime = DateTime.Now;
|
||
//特殊编码映射
|
||
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
|
||
var taskData = new MeterReadingTelemetryPacketInfo()
|
||
{
|
||
SystemName = _applicationOptions.SystemType,
|
||
ProjectId = $"{item.ProjectId}",
|
||
DeviceType = $"{item.DeviceType}",
|
||
DeviceId = $"{item.DeviceId}",
|
||
DataType = analysisBaseDto.DataType,
|
||
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||
DatabaseBusiID = item.DatabaseBusiID,
|
||
PendingCopyReadTime = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
|
||
CreationTime = currentTime,
|
||
FocusId = item.FocusId,
|
||
FocusAddress = analysisBaseDto.Code,
|
||
MeterAddress= analysisBaseDto.Code== item.DeviceAddress?"": item.DeviceAddress,// 判断是否能取到表地址
|
||
AFN = analysisBaseDto.AFN,
|
||
Fn = analysisBaseDto.Fn,
|
||
Seq = analysisBaseDto.PSEQ,
|
||
MSA = analysisBaseDto.MSA,
|
||
ItemCode = itemCodeInfo.Item1,
|
||
SubItemCode = itemCodeInfo.Item2,
|
||
TaskMark = taskMark,
|
||
IsSend = false,
|
||
ManualOrNot = false,
|
||
Pn = analysisBaseDto.Pn,
|
||
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
|
||
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
|
||
IsReceived = true,
|
||
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
|
||
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
|
||
ReceivedTime = analysisBaseDto.ReceivedTime,
|
||
};
|
||
|
||
meterReadingTelemetryPacketInfos.Add(taskData);
|
||
}
|
||
if (meterReadingTelemetryPacketInfos.Count > 0)
|
||
{
|
||
await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
|
||
}
|
||
return await Task.FromResult(true);
|
||
}
|
||
|
||
|
||
}
|
||
}
|