2025-06-03 23:01:46 +08:00

594 lines
30 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeRedis;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto;
using JiShe.ServicePro.Consts;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.Encrypt;
using JiShe.ServicePro.IoTDBManagement.Model;
using JiShe.ServicePro.IoTDBManagement.Options;
using JiShe.ServicePro.IoTDBManagement.SessionPools;
using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.IoTDBManagement.TreeModels;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Threading.Channels;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
using static FreeSql.Internal.GlobalFilter;
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
public class DataStorage:ITransientDependency
{
private readonly IGuidGenerator _guidGenerator;
private readonly IoTDBSessionPoolProvider _dbProvider;
private readonly ServerApplicationOptions _applicationOptions;
private readonly ILogger<DataStorage> _logger;
private readonly IMemoryCache _imemoryCache;
private readonly IFreeRedisProvider _freeRedisProvider;
private IRedisClient Instance { get; set; }
public DataStorage(IoTDBSessionPoolProvider dbProvider, IOptions<ServerApplicationOptions> applicationOptions,
IGuidGenerator guidGenerator, ILogger<DataStorage> logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider)
{
_dbProvider= dbProvider;
_applicationOptions = applicationOptions.Value;
_guidGenerator= guidGenerator;
_logger= logger;
_imemoryCache = memoryCache;
_freeRedisProvider = freeRedisProvider;
Instance = _freeRedisProvider.Instance;
}
/// <summary>
/// 日志保存通道写入
/// </summary>
/// <returns></returns>
public async Task LogSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
{
await channelWriter.WriteAsync(dataItems);
}
/// <summary>
/// 日志刷新通道写入
/// </summary>
/// <returns></returns>
public async Task LogRefreshSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
{
await channelWriter.WriteAsync(dataItems);
}
/// <summary>
/// 获取缓存设备信息
/// </summary>
/// <param name="meterType"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
public async Task<DeviceCacheInfo?> GetDeviceInfoAsync(string code,int pn=1,string meterAddress=null)
{
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName)}";
string deviceKey= $"{code}:{pn}";
var deviceInfo= await _imemoryCache.GetOrCreateAsync(deviceKey, async entry =>
{
List<DeviceCacheInfo> devices= await Instance.HGetAsync<List<DeviceCacheInfo>>(redisCacheDeviceInfoHashKey, code) ?? new List<DeviceCacheInfo>();
var data = devices.Where(s => s.MeteringCode == pn).FirstOrDefault();
if (data != null)
entry.SetSlidingExpiration(TimeSpan.FromMinutes(5));
else
entry.SetSlidingExpiration(TimeSpan.FromSeconds(5));
return data ?? default;
});
if (deviceInfo == null && !string.IsNullOrWhiteSpace(meterAddress))
{
// TODO:透明转发回来的pn为0情况
string deviceAddressKey = $"{code}:{meterAddress}";
deviceInfo = await _imemoryCache.GetOrCreateAsync(deviceAddressKey, async entry =>
{
List<DeviceCacheInfo> devices = await Instance.HGetAsync<List<DeviceCacheInfo>>(redisCacheDeviceInfoHashKey, code) ?? new List<DeviceCacheInfo>();
var data = devices.Where(s => s.MeterAddress == meterAddress).FirstOrDefault();
if (data != null)
entry.SetSlidingExpiration(TimeSpan.FromMinutes(5));
else
entry.SetSlidingExpiration(TimeSpan.FromSeconds(5));
return data ?? default;
});
}
return deviceInfo;
}
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveDataToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
if(!data.TimeSpan.HasValue)
data.TimeSpan= analysisBaseDto.ReceivedTime;
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType.ToString()}",
ProjectId = $"{data.ProjectId}",
IoTDataType = analysisBaseDto.DataType,
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default)
};
var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived=true;
taskData.ReceivedMessageHexString= analysisBaseDto.ReceivedHexMessage;
taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{data.ProjectId}",
DeviceType = $"{data.DeviceType}",
DeviceId = $"{data.DeviceId}",
IoTDataType = analysisBaseDto.DataType,
FocusId = data.FocusId,
FocusAddress = analysisBaseDto.Code,
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = data.DatabaseBusiID,
PendingCopyReadTime = data.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress, // 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime = analysisBaseDto.ReceivedTime,
};
}
await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
//如果无字段名,则不保存数据
if (!string.IsNullOrWhiteSpace(data.FiledName))
{
await _dbProvider.GetSessionPool(false).InsertAsync(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
IoTDataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
IoTDataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID);
}
return await Task.FromResult(true);
}
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <param name="saveData"></param>
/// <returns></returns>
public async Task<bool> SaveMultipleDataToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
List<TreeModelSingleMeasuringEntity<bool>> meterIsSyncs = new List<TreeModelSingleMeasuringEntity<bool>>();
List<TreeModelSingleMeasuringEntity<int>> meterDataBaseIDs = new List<TreeModelSingleMeasuringEntity<int>>();
foreach (var item in data)
{
if(!item.TimeSpan.HasValue)
item.TimeSpan = analysisBaseDto.ReceivedTime;
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
IoTDataType = analysisBaseDto.DataType,
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default)
};
var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived = true;
taskData.ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage;
taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{item.ProjectId}",
DeviceType = $"{item.DeviceType}",
DeviceId = $"{item.DeviceId}",
IoTDataType = IOTDBDataTypeConst.Log, // 匹配不到下发记录标记为LOG
Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
PendingCopyReadTime = item.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
FocusId = item.FocusId,
FocusAddress = analysisBaseDto.Code,
MeterAddress = analysisBaseDto.Code == item.DeviceAddress ? "" : item.DeviceAddress, // 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime = analysisBaseDto.ReceivedTime,
};
}
meterReadingTelemetryPacketInfos.Add(taskData);
//如果无字段名,则不保存数据,如saveData=false 也不保存数据
if (!string.IsNullOrWhiteSpace(item.FiledName) && !string.IsNullOrWhiteSpace(item.ProjectId))
{
treeModelSingleMeasuringEntities.Add(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
IoTDataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
if(!meterIsSyncs.Any(a=> a.IoTDataType == meterIsSync.IoTDataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps))
meterIsSyncs.Add(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
IoTDataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID)
};
if (!meterDataBaseIDs.Any(a => a.IoTDataType == meterIsSync.IoTDataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps))
meterDataBaseIDs.Add(meterIsDatabaseBusiID);
}
}
// 批量保存数据
await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
if (treeModelSingleMeasuringEntities.Count > 0)
{
await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
// 报存标识字段
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs);
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs);
}
return await Task.FromResult(true);
}
/// <summary>
/// 保存状态到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveStatusToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
{
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.FiledName));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.DataValue));
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
var data = analysisBaseDto.Data!;
if (!data.TimeSpan.HasValue)
data.TimeSpan = analysisBaseDto.ReceivedTime;
// 类型(心跳,登录,上电,掉电)
if (!string.IsNullOrWhiteSpace(data.ProjectId))
{
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
var treeData = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
IoTDataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (data.FiledName!, data.DataValue!)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
IoTDataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
// 时间
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
IoTDataType = IOTDBDataTypeConst.Status,
SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
}
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{data.ProjectId}",
DeviceType = $"{data.DeviceType}",
DeviceId = $"{data.DeviceId}",
IoTDataType = analysisBaseDto.DataType,
FocusId = data.FocusId,
FocusAddress = analysisBaseDto.Code,
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = data.DatabaseBusiID,
PendingCopyReadTime = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress,// 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime=analysisBaseDto.ReceivedTime,
};
await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
return await Task.FromResult(true);
}
/// <summary>
/// 保存状态到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveMultipleStatusToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
{
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
var data = analysisBaseDto.Data!;
foreach (var item in data)
{
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.FiledName));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.DataValue));
if (!item.TimeSpan.HasValue)
item.TimeSpan = analysisBaseDto.ReceivedTime;
// 类型(心跳,登录,上电,掉电)
if (!string.IsNullOrWhiteSpace(item.ProjectId))
{
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
var treeData = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
IoTDataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (item.FiledName!, item.DataValue!)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{
SystemName = _applicationOptions.SystemType,
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
IoTDataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
// 时间
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
{
SystemName = _applicationOptions.SystemType,
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
IoTDataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
}
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{item.ProjectId}",
DeviceType = $"{item.DeviceType}",
DeviceId = $"{item.DeviceId}",
IoTDataType = analysisBaseDto.DataType,
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
PendingCopyReadTime = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
FocusId = item.FocusId,
FocusAddress = analysisBaseDto.Code,
MeterAddress= analysisBaseDto.Code== item.DeviceAddress?"": item.DeviceAddress,// 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime = analysisBaseDto.ReceivedTime,
};
meterReadingTelemetryPacketInfos.Add(taskData);
}
if (meterReadingTelemetryPacketInfos.Count > 0)
{
await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
}
return await Task.FromResult(true);
}
}
}