2025-05-14 15:20:26 +08:00

524 lines
26 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeRedis;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.LogRecord;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading.Channels;
using TouchSocket.Core;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
using YamlDotNet.Core.Tokens;
using static FreeSql.Internal.GlobalFilter;
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
public class DataStorage:ITransientDependency
{
private readonly IGuidGenerator _guidGenerator;
private readonly IIoTDbProvider _dbProvider;
private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly ILogger<DataStorage> _logger;
private readonly IMemoryCache _imemoryCache;
private readonly IFreeRedisProvider _freeRedisProvider;
private RedisClient Instance { get; set; }
public DataStorage(IIoTDbProvider dbProvider, IOptions<ServerApplicationOptions> applicationOptions,
IGuidGenerator guidGenerator, IoTDBRuntimeContext runtimeContext, ILogger<DataStorage> logger, IMemoryCache memoryCache, IFreeRedisProvider freeRedisProvider)
{
_dbProvider= dbProvider;
_applicationOptions = applicationOptions.Value;
_guidGenerator= guidGenerator;
_runtimeContext= runtimeContext;
_logger= logger;
_imemoryCache = memoryCache;
_freeRedisProvider = freeRedisProvider;
Instance = _freeRedisProvider.Instance;
}
/// <summary>
/// 日志保存通道写入
/// </summary>
/// <returns></returns>
public async Task LogSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
{
await channelWriter.WriteAsync(dataItems);
}
/// <summary>
/// 日志刷新通道写入
/// </summary>
/// <returns></returns>
public async Task LogRefreshSaveWriterAsync(ChannelWriter<object> channelWriter, dynamic dataItems)
{
await channelWriter.WriteAsync(dataItems);
}
/// <summary>
/// 获取缓存设备信息
/// </summary>
/// <param name="meterType"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
public async Task<DeviceInfo?> GetDeviceInfoAsync(string code,int pn=1)
{
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName)}";
string deviceKey= $"{code}:{pn}";
return await _imemoryCache.GetOrCreateAsync(deviceKey, async entry =>
{
List<DeviceInfo> devices= await Instance.HGetAsync<List<DeviceInfo>>(redisCacheDeviceInfoHashKey, code) ?? new List<DeviceInfo>();
var data = devices.Where(s => s.MeteringCode == pn).FirstOrDefault();
if (data != null)
entry.SetSlidingExpiration(TimeSpan.FromMinutes(5));
else
entry.SetSlidingExpiration(TimeSpan.FromSeconds(5));
return data ?? default;
});
}
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveDataToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
if(!data.TimeSpan.HasValue)
data.TimeSpan= analysisBaseDto.ReceivedTime;
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType.ToString()}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default)
};
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived=true;
taskData.ReceivedMessageHexString= analysisBaseDto.ReceivedHexMessage;
taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{data.ProjectId}",
DeviceType = $"{data.DeviceType}",
DeviceId = $"{data.DeviceId}",
DataType = analysisBaseDto.DataType,
FocusId = data.FocusId,
FocusAddress = analysisBaseDto.Code,
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = data.DatabaseBusiID,
PendingCopyReadTime = data.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress, // 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = data.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime = analysisBaseDto.ReceivedTime,
};
}
_runtimeContext.UseTableSessionPool = true; // 使树模型池
await _dbProvider.InsertAsync(taskData);
//如果无字段名,则不保存数据
if (!string.IsNullOrWhiteSpace(data.FiledName))
{
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(meter);
}
return await Task.FromResult(true);
}
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <param name="saveData"></param>
/// <returns></returns>
public async Task<bool> SaveMultipleDataToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
foreach (var item in data)
{
if(!item.TimeSpan.HasValue)
item.TimeSpan = analysisBaseDto.ReceivedTime;
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default)
};
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived = true;
taskData.ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage;
taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{item.ProjectId}",
DeviceType = $"{item.DeviceType}",
DeviceId = $"{item.DeviceId}",
DataType = IOTDBDataTypeConst.Log, // 匹配不到下发记录标记为LOG
Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
PendingCopyReadTime = item.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
FocusId = item.FocusId,
FocusAddress = analysisBaseDto.Code,
MeterAddress = analysisBaseDto.Code == item.DeviceAddress ? "" : item.DeviceAddress, // 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = item.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime = analysisBaseDto.ReceivedTime,
};
}
meterReadingTelemetryPacketInfos.Add(taskData);
//如果无字段名,则不保存数据,如saveData=false 也不保存数据
if (!string.IsNullOrWhiteSpace(item.FiledName))
{
treeModelSingleMeasuringEntities.Add(meter);
}
}
// 批量保存数据
_runtimeContext.UseTableSessionPool = true; // 使树模型池
await _dbProvider.BatchInsertAsync(meterReadingTelemetryPacketInfos);
if (treeModelSingleMeasuringEntities.Count > 0)
{
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.BatchInsertAsync(treeModelSingleMeasuringEntities);
}
return await Task.FromResult(true);
}
/// <summary>
/// 保存状态到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveStatusToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
{
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.FiledName));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(analysisBaseDto.Data.DataValue));
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
var data = analysisBaseDto.Data!;
if (!data.TimeSpan.HasValue)
data.TimeSpan = analysisBaseDto.ReceivedTime;
// 类型(心跳,登录,上电,掉电)
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
var treeData = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = timestamps,
SingleMeasuring = (data.FiledName!, data.DataValue!)
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeData);
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeFrameData);
// 时间
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
DataType = analysisBaseDto.DataType,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeRecordingTimeData);
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{data.ProjectId}",
DeviceType = $"{data.DeviceType}",
DeviceId = $"{data.DeviceId}",
DataType = analysisBaseDto.DataType,
FocusId = data.FocusId,
FocusAddress = analysisBaseDto.Code,
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = data.DatabaseBusiID,
PendingCopyReadTime = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
MeterAddress = analysisBaseDto.Code == data.DeviceAddress ? "" : data.DeviceAddress,// 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = data.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime=analysisBaseDto.ReceivedTime,
};
_runtimeContext.UseTableSessionPool = true; // 使表模型池
await _dbProvider.InsertAsync(taskData);
return await Task.FromResult(true);
}
/// <summary>
/// 保存状态到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveMultipleStatusToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
{
ArgumentNullException.ThrowIfNull(nameof(analysisBaseDto.Data));
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
var data = analysisBaseDto.Data!;
foreach (var item in data)
{
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.FiledName));
ArgumentException.ThrowIfNullOrWhiteSpace(nameof(item.DataValue));
if (!item.TimeSpan.HasValue)
item.TimeSpan = analysisBaseDto.ReceivedTime;
// 类型(心跳,登录,上电,掉电)
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
var treeData = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = timestamps,
SingleMeasuring = (item.FiledName!, item.DataValue!)
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeData);
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{
SystemName = _applicationOptions.SystemType,
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeFrameData);
// 时间
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<DateTime>()
{
SystemName = _applicationOptions.SystemType,
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeRecordingTimeData);
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{item.ProjectId}",
DeviceType = $"{item.DeviceType}",
DeviceId = $"{item.DeviceId}",
DataType = analysisBaseDto.DataType,
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
PendingCopyReadTime = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
FocusId = item.FocusId,
FocusAddress = analysisBaseDto.Code,
MeterAddress= analysisBaseDto.Code== item.DeviceAddress?"": item.DeviceAddress,// 判断是否能取到表地址
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = item.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.ReceivedHexMessage,
IsReceived = true,
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
ReceivedTime = analysisBaseDto.ReceivedTime,
};
meterReadingTelemetryPacketInfos.Add(taskData);
}
if (meterReadingTelemetryPacketInfos.Count > 0)
{
_runtimeContext.UseTableSessionPool = true; // 使表模型池
await _dbProvider.BatchInsertAsync(meterReadingTelemetryPacketInfos);
}
return await Task.FromResult(true);
}
}
}