251 lines
12 KiB
C#
Raw Permalink Normal View History

using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto;
using Microsoft.Extensions.Options;
2025-04-25 15:21:43 +08:00
using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
2025-04-25 15:21:43 +08:00
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
public class DataStorage:ITransientDependency
{
private readonly IGuidGenerator _guidGenerator;
private readonly IIoTDbProvider _dbProvider;
private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
public DataStorage(IIoTDbProvider dbProvider, IOptions<ServerApplicationOptions> applicationOptions, IGuidGenerator guidGenerator, IoTDBRuntimeContext runtimeContext)
{
_dbProvider= dbProvider;
_applicationOptions = applicationOptions.Value;
_guidGenerator= guidGenerator;
_runtimeContext= runtimeContext;
}
2025-04-25 15:21:43 +08:00
/// <summary>
/// 获取缓存电表信息
/// </summary>
/// <param name="meterType"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
public async Task<AmmeterInfo> GetMeterInfoAsync(string meterType,string timeDensity="15")
{
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}";
return await Task.FromResult(new AmmeterInfo());
2025-04-25 15:21:43 +08:00
}
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveDataToIotDbAsync<T>(UnitDataAnalysis<AnalysisBaseDto<T>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
2025-04-29 11:43:16 +08:00
if(!data.TimeSpan.HasValue)
data.TimeSpan= analysisBaseDto.ReceivedTime;
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.MeterId}",
DeviceType = $"{data.MeterType}",
ProjectId = $"{data.ProjectId}",
2025-04-29 11:43:16 +08:00
Timestamps = data.TimeSpan!.Value.GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default)
};
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
2025-04-30 09:50:50 +08:00
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived=true;
taskData.ReceivedMessageHexString= analysisBaseDto.HexMessage;
taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{data.ProjectId}",
DeviceType = $"{data.MeterType}",
DeviceId = $"{data.MeterId}",
2025-04-29 11:43:16 +08:00
Timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = data.DatabaseBusiID,
2025-04-29 11:43:16 +08:00
PendingCopyReadTime = data.TimeSpan.Value.CheckTimePoint(), // // TODO:这里暂时格式化15分钟数据需要进行调整
CreationTime = currentTime,
MeterAddress = data.MeterAddress,
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = data.DataType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.HexMessage,
IsReceived = true,
ReceivedRemark = data.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
};
}
await _dbProvider.InsertAsync(taskData);
//如果无字段名,则不保存数据
2025-04-27 15:44:54 +08:00
if (!string.IsNullOrWhiteSpace(data.FiledName))
{
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(meter);
}
return await Task.FromResult(true);
}
2025-04-27 09:31:12 +08:00
/// <summary>
/// 保存数据到IotDb
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="analysisBaseDto"></param>
/// <returns></returns>
public async Task<bool> SaveMultipleDataToIotDbAsync<T>(UnitDataAnalysis<List<AnalysisBaseDto<T>>> analysisBaseDto)
{
var data = analysisBaseDto.Data!;
foreach (var item in data)
{
2025-04-29 11:43:16 +08:00
if(!item.TimeSpan.HasValue)
item.TimeSpan = analysisBaseDto.ReceivedTime;
2025-04-27 09:31:12 +08:00
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
string scoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
conditions.Add(new QueryCondition()
{
Field = "TaskMark",
Operator = "=",
IsNumber = false,
Value = taskMark
});
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
var meter = new TreeModelSingleMeasuringEntity<T>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.MeterId}",
DeviceType = $"{item.MeterType}",
ProjectId = $"{item.ProjectId}",
2025-04-29 11:43:16 +08:00
Timestamps = item.TimeSpan!.Value.CheckTimePoint().GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
2025-04-27 09:31:12 +08:00
SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue ?? default)
};
_runtimeContext.UseTableSessionPool = true; // 使用表模型池
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
var taskData = taskSendInfo?.Items.FirstOrDefault();
if (taskData != null)
{
// 更新
2025-04-30 09:50:50 +08:00
meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
2025-04-27 09:31:12 +08:00
taskData.IsReceived = true;
taskData.ReceivedMessageHexString = analysisBaseDto.HexMessage;
taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty;
}
else
{
// 新建
var currentTime = DateTime.Now;
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{item.ProjectId}",
DeviceType = $"{item.MeterType}",
DeviceId = $"{item.MeterId}",
Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
2025-04-29 11:43:16 +08:00
PendingCopyReadTime = item.TimeSpan.Value.CheckTimePoint(), // TODO:这里暂时格式化15分钟数据需要进行调整
2025-04-27 09:31:12 +08:00
CreationTime = currentTime,
MeterAddress = item.MeterAddress,
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = item.DataType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = analysisBaseDto.Pn,
ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty,
ReceivedMessageHexString = analysisBaseDto.HexMessage,
IsReceived = true,
ReceivedRemark = item.ErrorCodeMsg ?? string.Empty,
ScoreValue = $"{analysisBaseDto.Code}.{taskMark}".Md5Fun(),
};
}
await _dbProvider.InsertAsync(taskData);
//如果无字段名,则不保存数据
2025-04-27 15:44:54 +08:00
if (!string.IsNullOrWhiteSpace(item.FiledName))
2025-04-27 09:31:12 +08:00
{
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(meter);
}
}
return await Task.FromResult(true);
}
2025-04-25 15:21:43 +08:00
}
}