525 lines
26 KiB
C#
Raw Normal View History

2025-05-13 17:49:12 +08:00
using FreeRedis;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
2025-05-13 17:49:12 +08:00
using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
2025-05-13 17:49:12 +08:00
using JiShe.CollectBus.IotSystems.Devices;
2025-05-12 14:02:22 +08:00
using JiShe.CollectBus.IotSystems.LogRecord;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto;
2025-05-13 17:49:12 +08:00
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Caching.Memory;
2025-05-12 14:02:22 +08:00
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
2025-05-12 14:02:22 +08:00
using System.Diagnostics;
2025-04-30 17:25:35 +08:00
using System.Diagnostics.Metrics;
2025-05-12 14:02:22 +08:00
using System.Threading.Channels;
2025-05-13 17:49:12 +08:00
using TouchSocket.Core;
2025-04-25 15:21:43 +08:00
using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
2025-05-13 17:49:12 +08:00
using YamlDotNet.Core.Tokens;
2025-04-30 17:25:35 +08:00
using static FreeSql.Internal.GlobalFilter;
2025-05-07 11:53:50 +08:00
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
2025-04-25 15:21:43 +08:00
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
public class DataStorage:ITransientDependency
{
private readonly IGuidGenerator _guidGenerator;
private readonly IIoTDbProvider _dbProvider;
2025-05-18 16:04:23 +08:00
private readonly ServerApplicationOptions _applicationOptions;
2025-05-12 14:02:22 +08:00
private readonly ILogger<DataStorage> _logger;
2025-05-13 17:49:12 +08:00
private readonly IMemoryCache _imemoryCache;
private readonly IFreeRedisProvider _freeRedisProvider;
private RedisClient Instance { get; set; }
2025-05-12 14:02:22 +08:00
public DataStorage(IIoTDbProvider dbProvider, IOptions<ServerApplicationOptions> applicationOptions,
2025-05-18 16:04:23 +08:00
IGuidGenerator guidGenerator, ILogger<DataStorage> logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider)
{
_dbProvider= dbProvider;
_applicationOptions = applicationOptions.Value;
2025-05-18 16:04:23 +08:00
_guidGenerator= guidGenerator;
2025-05-12 14:02:22 +08:00
_logger= logger;
2025-05-13 17:49:12 +08:00
_imemoryCache = memoryCache;
_freeRedisProvider = freeRedisProvider;
Instance = _freeRedisProvider.Instance;
2025-05-12 14:02:22 +08:00
}
/// <summary>
/// 日志保存通道写入
/// </summary>
/// <returns></returns>
public async Task LogSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
{
await channelWriter.WriteAsync(dataItems);
}
/// <summary>
/// 日志刷新通道写入
/// </summary>
/// <returns></returns>
public async Task LogRefreshSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
{
await channelWriter.WriteAsync(dataItems);
}
2025-04-25 15:21:43 +08:00
2025-05-13 17:49:12 +08:00
/// <summary>
2025-05-13 17:49:12 +08:00
/// 获取缓存设备信息
/// </summary>
/// <param name="meterType"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
public async Task<DeviceInfo?> GetDeviceInfoAsync(string code,int pn=1,string meterAddress=null)
{
2025-05-13 17:49:12 +08:00
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName)}";
string deviceKey= $"{code}:{pn}";
var deviceInfo= await _imemoryCache.GetOrCreateAsync(deviceKey, async entry =>
2025-05-13 17:49:12 +08:00
{
2025-05-14 15:20:26 +08:00
List<DeviceInfo> devices= await Instance.HGetAsync<List<DeviceInfo>>(redisCacheDeviceInfoHashKey, code) ?? new List<DeviceInfo>();
2025-05-13 17:49:12 +08:00
var data = devices.Where(s => s.MeteringCode == pn).FirstOrDefault();
if (data != null)
entry.SetSlidingExpiration(TimeSpan.FromMinutes(5));
else
entry.SetSlidingExpiration(TimeSpan.FromSeconds(5));
return data ?? default;
2025-05-07 11:53:50 +08:00
});
if (deviceInfo == null && !string.IsNullOrWhiteSpace(meterAddress))
{
// TODO:透明转发回来的pn为0情况
string deviceAddressKey = $"{code}:{meterAddress}";
deviceInfo = await _imemoryCache.GetOrCreateAsync(deviceAddressKey, async entry =>
{
List<DeviceInfo> devices = await Instance.HGetAsync<List<DeviceInfo>>(redisCacheDeviceInfoHashKey, code) ?? new List<DeviceInfo>();
var data = devices.Where(s => s.MeterAddress == meterAddress).FirstOrDefault();
if (data != null)
entry.SetSlidingExpiration(TimeSpan.FromMinutes(5));
else
entry.SetSlidingExpiration(TimeSpan.FromSeconds(5));
return data ?? default;
});
}
return deviceInfo;
2025-04-25 15:21:43 +08:00
}
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveDataToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
2025-04-29 11:43:16 +08:00
if(!data.TimeSpan.HasValue)
data.TimeSpan= analysisBaseDto.ReceivedTime;
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
2025-05-07 11:53:50 +08:00
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType.ToString()}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
2025-05-07 15:17:54 +08:00
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default)
};
2025-05-18 16:04:23 +08:00
var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
2025-04-30 09:50:50 +08:00
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived=true;
2025-05-12 14:02:22 +08:00
taskData.ReceivedMessageHexString= analysisBaseDto.ReceivedHexMessage;
taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{data.ProjectId}",
2025-05-07 11:53:50 +08:00
DeviceType = $"{data.DeviceType}",
DeviceId = $"{data.DeviceId}",
DataType = analysisBaseDto.DataType,
2025-05-12 16:48:45 +08:00
FocusId = data.FocusId,
FocusAddress = analysisBaseDto.Code,
2025-04-29 11:43:16 +08:00
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = data.DatabaseBusiID,
2025-05-12 16:48:45 +08:00
PendingCopyReadTime = data.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
2025-05-12 16:48:45 +08:00
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress, // 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = data.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
2025-05-12 16:48:45 +08:00
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
2025-05-12 14:02:22 +08:00
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
2025-05-12 14:02:22 +08:00
ReceivedTime = analysisBaseDto.ReceivedTime,
};
2025-05-18 16:04:23 +08:00
}
await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
//如果无字段名,则不保存数据
2025-04-27 15:44:54 +08:00
if (!string.IsNullOrWhiteSpace(data.FiledName))
2025-05-18 16:04:23 +08:00
{
await _dbProvider.GetSessionPool(false).InsertAsync(meter);
}
return await Task.FromResult(true);
}
2025-04-27 09:31:12 +08:00
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
2025-05-12 14:02:22 +08:00
/// <param name="saveData"></param>
2025-04-27 09:31:12 +08:00
/// <returns></returns>
public async Task<bool> SaveMultipleDataToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
2025-04-30 17:25:35 +08:00
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
2025-05-12 14:02:22 +08:00
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
2025-04-27 09:31:12 +08:00
foreach (var item in data)
{
2025-04-29 11:43:16 +08:00
if(!item.TimeSpan.HasValue)
item.TimeSpan = analysisBaseDto.ReceivedTime;
2025-04-27 09:31:12 +08:00
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
2025-05-07 11:53:50 +08:00
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
2025-04-27 09:31:12 +08:00
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
2025-05-07 15:17:54 +08:00
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
2025-05-08 08:48:09 +08:00
SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default)
2025-05-18 16:04:23 +08:00
};
var taskSendInfo = await _dbProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
2025-04-27 09:31:12 +08:00
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
2025-04-30 09:50:50 +08:00
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
2025-04-27 09:31:12 +08:00
taskData.IsReceived = true;
2025-05-12 14:02:22 +08:00
taskData.ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage;
2025-04-27 09:31:12 +08:00
taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{item.ProjectId}",
2025-05-07 11:53:50 +08:00
DeviceType = $"{item.DeviceType}",
DeviceId = $"{item.DeviceId}",
2025-05-12 16:48:45 +08:00
DataType = IOTDBDataTypeConst.Log, // 匹配不到下发记录标记为LOG
2025-04-27 09:31:12 +08:00
Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
2025-04-30 17:25:35 +08:00
PendingCopyReadTime = item.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
2025-04-27 09:31:12 +08:00
CreationTime = currentTime,
2025-05-12 16:48:45 +08:00
FocusId = item.FocusId,
FocusAddress = analysisBaseDto.Code,
MeterAddress = analysisBaseDto.Code == item.DeviceAddress ? "" : item.DeviceAddress, // 判断是否能取到表地址
2025-04-27 09:31:12 +08:00
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = item.ItemType,
2025-04-27 09:31:12 +08:00
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
2025-05-12 14:02:22 +08:00
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
2025-04-27 09:31:12 +08:00
IsReceived = true,
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
2025-05-12 14:02:22 +08:00
ReceivedTime = analysisBaseDto.ReceivedTime,
2025-04-27 09:31:12 +08:00
};
}
2025-04-30 17:25:35 +08:00
meterReadingTelemetryPacketInfos.Add(taskData);
2025-05-12 14:02:22 +08:00
//如果无字段名,则不保存数据,如saveData=false 也不保存数据
2025-04-27 15:44:54 +08:00
if (!string.IsNullOrWhiteSpace(item.FiledName))
2025-04-27 09:31:12 +08:00
{
2025-04-30 17:25:35 +08:00
treeModelSingleMeasuringEntities.Add(meter);
2025-04-27 09:31:12 +08:00
}
}
2025-05-18 16:04:23 +08:00
// 批量保存数据
await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
2025-04-30 17:25:35 +08:00
if (treeModelSingleMeasuringEntities.Count > 0)
2025-05-18 16:04:23 +08:00
{
await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
2025-04-30 17:25:35 +08:00
}
2025-04-27 09:31:12 +08:00
return await Task.FromResult(true);
}
2025-05-07 11:53:50 +08:00
/// <summary>
/// 保存状态到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveStatusToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
{
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
2025-05-08 09:25:41 +08:00
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.FiledName));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.DataValue));
2025-05-12 14:02:22 +08:00
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
2025-05-07 11:53:50 +08:00
var data = analysisBaseDto.Data!;
if (!data.TimeSpan.HasValue)
data.TimeSpan = analysisBaseDto.ReceivedTime;
// 类型(心跳,登录,上电,掉电)
2025-05-07 15:17:54 +08:00
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
2025-05-07 11:53:50 +08:00
var treeData = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
2025-05-07 11:53:50 +08:00
ProjectId = $"{data.ProjectId}",
2025-05-15 09:51:50 +08:00
DataType = IOTDBDataTypeConst.Status,
2025-05-07 11:53:50 +08:00
Timestamps = timestamps,
2025-05-08 08:48:09 +08:00
SingleMeasuring = (data.FiledName!, data.DataValue!)
2025-05-18 16:04:23 +08:00
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
2025-05-07 17:10:02 +08:00
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
2025-05-07 11:53:50 +08:00
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
2025-05-07 11:53:50 +08:00
ProjectId = $"{data.ProjectId}",
2025-05-15 09:51:50 +08:00
DataType = IOTDBDataTypeConst.Status,
2025-05-07 11:53:50 +08:00
Timestamps = timestamps,
2025-05-12 14:02:22 +08:00
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
2025-05-07 11:53:50 +08:00
};
2025-05-18 16:04:23 +08:00
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
2025-05-07 17:10:02 +08:00
// 时间
2025-05-12 17:24:47 +08:00
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
2025-05-07 11:53:50 +08:00
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
2025-05-07 11:53:50 +08:00
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
2025-05-15 09:51:50 +08:00
DataType = IOTDBDataTypeConst.Status,
2025-05-12 17:24:47 +08:00
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
2025-05-18 16:04:23 +08:00
};
await _dbProvider.GetSessionPool(true).InsertAsync(treeRecordingTimeData);
2025-05-12 14:02:22 +08:00
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
var taskData = new MeterReadingTelemetryPacketInfo()
2025-05-07 11:53:50 +08:00
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{data.ProjectId}",
2025-05-12 14:02:22 +08:00
DeviceType = $"{data.DeviceType}",
DeviceId = $"{data.DeviceId}",
DataType = analysisBaseDto.DataType,
2025-05-12 16:48:45 +08:00
FocusId = data.FocusId,
FocusAddress = analysisBaseDto.Code,
2025-05-12 14:02:22 +08:00
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = data.DatabaseBusiID,
PendingCopyReadTime = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
2025-05-12 16:48:45 +08:00
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress,// 判断是否能取到表地址
2025-05-12 14:02:22 +08:00
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = data.ItemType,
2025-05-12 14:02:22 +08:00
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime=analysisBaseDto.ReceivedTime,
2025-05-18 16:04:23 +08:00
};
await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
2025-05-12 14:02:22 +08:00
2025-05-07 11:53:50 +08:00
return await Task.FromResult(true);
}
2025-04-27 09:31:12 +08:00
2025-05-08 09:25:41 +08:00
/// <summary>
/// 保存状态到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveMultipleStatusToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
{
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
2025-05-12 14:02:22 +08:00
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
2025-05-08 09:25:41 +08:00
var data = analysisBaseDto.Data!;
foreach (var item in data)
{
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.FiledName));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.DataValue));
if (!item.TimeSpan.HasValue)
item.TimeSpan = analysisBaseDto.ReceivedTime;
// 类型(心跳,登录,上电,掉电)
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
var treeData = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
2025-05-08 17:33:07 +08:00
DeviceType = $"{item.DeviceType}",
2025-05-08 09:25:41 +08:00
ProjectId = $"{item.ProjectId}",
2025-05-15 09:51:50 +08:00
DataType = IOTDBDataTypeConst.Status,
2025-05-08 09:25:41 +08:00
Timestamps = timestamps,
2025-05-08 17:33:07 +08:00
SingleMeasuring = (item.FiledName!, item.DataValue!)
2025-05-18 16:04:23 +08:00
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeData);
2025-05-08 09:25:41 +08:00
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{
SystemName = _applicationOptions.SystemType,
2025-05-08 17:33:07 +08:00
DeviceType = $"{item.DeviceType}",
2025-05-08 09:25:41 +08:00
ProjectId = $"{item.ProjectId}",
2025-05-15 09:51:50 +08:00
DataType = IOTDBDataTypeConst.Status,
2025-05-08 09:25:41 +08:00
Timestamps = timestamps,
2025-05-12 14:02:22 +08:00
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
2025-05-08 09:25:41 +08:00
};
2025-05-18 16:04:23 +08:00
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
2025-05-08 09:25:41 +08:00
// 时间
2025-05-12 17:24:47 +08:00
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
2025-05-08 09:25:41 +08:00
{
SystemName = _applicationOptions.SystemType,
2025-05-08 17:33:07 +08:00
DeviceType = $"{item.DeviceType}",
2025-05-08 09:25:41 +08:00
ProjectId = $"{item.ProjectId}",
2025-05-15 09:51:50 +08:00
DataType = IOTDBDataTypeConst.Status,
2025-05-08 09:25:41 +08:00
Timestamps = timestamps,
2025-05-12 17:24:47 +08:00
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
2025-05-18 16:04:23 +08:00
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
2025-05-12 14:02:22 +08:00
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
var taskData = new MeterReadingTelemetryPacketInfo()
2025-05-08 09:25:41 +08:00
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{item.ProjectId}",
2025-05-12 14:02:22 +08:00
DeviceType = $"{item.DeviceType}",
DeviceId = $"{item.DeviceId}",
DataType = analysisBaseDto.DataType,
2025-05-12 14:02:22 +08:00
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
PendingCopyReadTime = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
2025-05-12 16:48:45 +08:00
FocusId = item.FocusId,
FocusAddress = analysisBaseDto.Code,
MeterAddress= analysisBaseDto.Code== item.DeviceAddress?"": item.DeviceAddress,// 判断是否能取到表地址
2025-05-12 14:02:22 +08:00
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = item.ItemType,
2025-05-12 14:02:22 +08:00
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime = analysisBaseDto.ReceivedTime,
2025-05-08 09:25:41 +08:00
};
2025-05-12 14:02:22 +08:00
meterReadingTelemetryPacketInfos.Add(taskData);
}
if (meterReadingTelemetryPacketInfos.Count > 0)
2025-05-18 16:04:23 +08:00
{
await _dbProvider.GetSessionPool(true).BatchInsertAsync(meterReadingTelemetryPacketInfos);
2025-05-08 09:25:41 +08:00
}
return await Task.FromResult(true);
}
2025-05-12 14:02:22 +08:00
2025-04-25 15:21:43 +08:00
}
}