优化代码

This commit is contained in:
ChenYi 2025-04-08 17:44:42 +08:00
parent 323ad75f8a
commit 39ae8a7f38
14 changed files with 186 additions and 105 deletions

View File

@ -15,8 +15,6 @@ namespace JiShe.CollectBus.Subscribers
#region
Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery();
/// <summary>
/// 1分钟采集电表数据下行消息消费订阅
/// </summary>

View File

@ -66,21 +66,21 @@ namespace JiShe.CollectBus.DataMigration
/// <returns></returns>
private async Task ProduceDataAsync(ChannelWriter<MeterReadingRecords[]> writer)
{
while (true)
{
var queryable = await _meterReadingRecordsRepository.GetQueryableAsync();
var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted)
.Take(_options.MongoDbDataBatchSize)
.ToArray();
//while (true)
//{
// var queryable = await _meterReadingRecordsRepository.GetQueryableAsync();
// var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted)
// .Take(_options.MongoDbDataBatchSize)
// .ToArray();
if (batchRecords == null || batchRecords.Length == 0)
{
writer.Complete();
break;
}
// if (batchRecords == null || batchRecords.Length == 0)
// {
// writer.Complete();
// break;
// }
await writer.WriteAsync(batchRecords);
}
// await writer.WriteAsync(batchRecords);
//}
}
/// <summary>
@ -111,14 +111,14 @@ namespace JiShe.CollectBus.DataMigration
//await writer.WriteAsync(dataTable);
// 批量更新标记
var ids = batch.Select(d => d.Id).ToArray();
foreach (var item in batch)
{
item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress;
item.MigrationTime = DateTime.Now;
}
//var ids = batch.Select(d => d.Id).ToArray();
//foreach (var item in batch)
//{
// item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress;
// item.MigrationTime = DateTime.Now;
//}
await _meterReadingRecordsRepository.UpdateManyAsync(batch);
//await _meterReadingRecordsRepository.UpdateManyAsync(batch);
}
writer.Complete();
}

View File

@ -12,28 +12,41 @@ using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using JiShe.CollectBus.IoTDBProvider.Context;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Samples;
public class SampleAppService : CollectBusAppService, ISampleAppService
{
private readonly ILogger<SampleAppService> _logger;
private readonly IIoTDBProvider _iotDBProvider;
private readonly IoTDBRuntimeContext _dbContext;
private readonly IoTDBOptions _options;
public SampleAppService(IIoTDBProvider iotDBProvider, IOptions<IoTDBOptions> options,
IoTDBRuntimeContext dbContext)
IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger)
{
_iotDBProvider = iotDBProvider;
_options = options.Value;
_dbContext = dbContext;
_logger = logger;
}
[HttpGet]
public async Task UseSessionPool()
public async Task UseSessionPool(long timestamps)
{
string? messageHexString = null;
if (timestamps == 0)
{
timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
_logger.LogError($"timestamps_{timestamps}");
}
else
{
messageHexString = messageHexString + timestamps;
}
ElectricityMeter meter = new ElectricityMeter()
{
SystemName = "energy",
@ -43,7 +56,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
MeterModel = "DDZY-1980",
ProjectCode = "10059",
Voltage = 10,
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
IssuedMessageHexString = messageHexString,
Timestamps = timestamps,
};
await _iotDBProvider.InsertAsync(meter);
}

View File

@ -6,6 +6,7 @@ using System.Net;
using System.Threading.Tasks;
using DotNetCore.CAP;
using FreeSql;
using FreeSql.Internal.CommonProvider;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
@ -15,6 +16,7 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
@ -40,16 +42,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _producerBus;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordsRepository)
IMeterReadingRecordRepository meterReadingRecordRepository,
IIoTDBProvider dbProvider)
{
_producerBus = producerBus;
_logger = logger;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository;
}
/// <summary>
@ -268,6 +273,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
//获取缓存中的电表信息
int timeDensity = 5;
var currentTime = DateTime.Now;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
@ -307,7 +314,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
//_dbProvider.SwitchSessionPool(true);
//await _dbProvider.InsertAsync(meterTaskInfosList);
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime);
}
//删除任务数据
@ -335,6 +345,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
//获取缓存中的电表信息
int timeDensity = 5;
var currentTime = DateTime.Now;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
@ -374,7 +386,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime);
}
//删除任务数据
@ -446,7 +458,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentDateTime);
}
//删除任务数据
@ -706,6 +718,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusID = ammeter.FocusID,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
ManualOrNot = false,
Pn = ammeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
@ -837,7 +851,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
@ -906,7 +920,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
@ -974,7 +988,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据

View File

@ -6,6 +6,7 @@ using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
@ -29,7 +30,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, producerBus, meterReadingRecordsRepository)
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository) : base(logger, producerBus, meterReadingRecordRepository, dbProvider)
{
}

View File

@ -130,9 +130,10 @@ namespace JiShe.CollectBus.Subscribers
Pn = 0,
FocusAddress = "",
MeterAddress = "",
DataResult = tb3761FN.Text,
});
//todo 将解析结果插入IoTDB时标从
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
}

View File

@ -56,23 +56,6 @@ namespace JiShe.CollectBus.Subscribers
#region
/// <summary>
/// 一分钟定时抄读任务消息消费订阅
/// </summary>
/// <returns></returns>
[HttpGet]
[Route("ammeter/oneminute/issued-eventQuery")]
public async Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery()
{
var currentDateTime = DateTime.Now;
var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10));
return list;
//return null;
}
/// <summary>
/// 一分钟定时抄读任务消息消费订阅
/// </summary>
@ -147,8 +130,6 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
// var dd = await _meterReadingRecordsRepository.FirstOrDefaultAsync(d=>d.ManualOrNot== true);
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{

View File

@ -12,6 +12,18 @@ namespace JiShe.CollectBus.Ammeters
[ATTRIBUTEColumn]
public string MeterModel { get; set; }
/// <summary>
/// 下发消息内容
/// </summary>
[FIELDColumn]
public string IssuedMessageHexString { get; set; }
///// <summary>
///// 下发消息Id
///// </summary>
//[FIELDColumn]
//public string IssuedMessageId { get; set; }
[FIELDColumn]
public double Voltage { get; set; }

View File

@ -29,5 +29,10 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
/// </summary>
public string MessageId { get; set; }
/// <summary>
/// 最后一次消息Id用于在消费消息时检查上一个任务是否处理完。
/// </summary>
public string LastMessageId { get; set; }
}
}

View File

@ -14,7 +14,6 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// </summary>
public class MeterReadingRecords : AggregateRoot<Guid>
{
/// <summary>
/// 是否手动操作
/// </summary>
@ -85,11 +84,15 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// </summary>
public int Pn { get; set; }
/// <summary>
/// 采集项编码
/// </summary>
public string ItemCode { get; set;}
/// <summary>
/// 是否下发成功
/// 是否超时
/// </summary>
public bool WasSuccessful { get; set; }
public bool IsTimeout { get; set; } = false;
/// <summary>
/// 创建时间
@ -112,24 +115,9 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
public string ReceivedMessageId { get; set; }
/// <summary>
/// 数据迁移状态
/// 上报报文解析备注,异常情况下才有
/// </summary>
public RecordsDataMigrationStatusEnum MigrationStatus { get; set; }
/// <summary>
/// 数据结果,最终的解析报文结果值
/// </summary>
public string DataResult { get; set; }
/// <summary>
/// 数据时间,如冻结时间、事件发生事件等
/// </summary>
public DateTime? DataGenerationTimestamp { get; set; }
/// <summary>
/// 数据迁移时间
/// </summary>
public DateTime? MigrationTime { get; set; }
public string ReceivedRemark { get; set; }
public void CreateDataId(Guid Id)
{

View File

@ -40,7 +40,7 @@
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
"Configuration": "118.190.144.92:6380,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"Configuration": "120.24.52.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "14",
"HangfireDB": "15"
},

View File

@ -15,6 +15,7 @@ using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.IoTDBProvider
{
@ -176,8 +177,21 @@ namespace JiShe.CollectBus.IoTDBProvider
var rowValues = new List<object>();
foreach (var measurement in metadata.ColumnNames)
{
var value = typeof(T).GetProperty(measurement)?.GetValue(entity);
rowValues.Add(value ?? new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空不符合IoTDB设计标准请赋值以后重新处理。"));
PropertyInfo propertyInfo = typeof(T).GetProperty(measurement);
if (propertyInfo==null)
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况。");
}
var value = propertyInfo.GetValue(entity);
if (value != null)
{
rowValues.Add(value);
}
else
{
DataTypeValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue);
rowValues.Add(defaultValue);
}
}
values.Add(rowValues);
if (!_runtimeContext.UseTableSessionPool)//树模型
@ -509,7 +523,28 @@ namespace JiShe.CollectBus.IoTDBProvider
["TIMESTAMP"] = TSDataType.TIMESTAMP,
["DATE"] = TSDataType.DATE,
["BLOB"] = TSDataType.BLOB,
["DECIMAL"] = TSDataType.STRING,
["STRING"] = TSDataType.STRING
};
/// <summary>
/// 根据类型名称获取 IoTDB 数据默认值
/// </summary>
private readonly IReadOnlyDictionary<string, object> DataTypeValueMap =
new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
["BOOLEAN"] = false,
["INT32"] = 0,
["INT64"] = 0,
["FLOAT"] = 0.0f,
["DOUBLE"] = 0.0d,
["TEXT"] = string.Empty,
["NULLTYPE"] = null,
["TIMESTAMP"] = null,
["DATE"] = null,
["BLOB"] = null,
["DECIMAL"] = "0.0",
["STRING"] = string.Empty
};
}
}

View File

@ -109,6 +109,21 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
return entity;
}
/// <summary>
/// 单个获取
/// </summary>
/// <param name="entity"></param>
/// <param name="dateTime"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
{
var collection = await GetShardedCollection(dateTime);
await collection.findon
throw new NotImplementedException();
}
/// <summary>
/// 多集合数据查询
/// </summary>
@ -156,9 +171,5 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
return database.GetCollection<MeterReadingRecords>(collectionName);
}
public Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
{
throw new NotImplementedException();
}
}
}

View File

@ -18,50 +18,71 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// <summary>
/// 1分钟采集电表数据下行消息主题
/// </summary>
public const string AmmeterSubscriberWorkerOneMinuteIssuedEventName = "issued.one.ammeter.event";
public const string AmmeterSubscriberWorkerOneMinuteIssuedEventName = "issued.auto.one.ammeter.event";
/// <summary>
/// 5分钟采集电表数据下行消息主题
/// </summary>
public const string AmmeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.five.ammeter.event";
public const string AmmeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.auto.five.ammeter.event";
/// <summary>
/// 15分钟采集电表数据下行消息主题
/// </summary>
public const string AmmeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.ammeter.event";
public const string AmmeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.auto.fifteen.ammeter.event";
/// <summary>
/// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、定时阀控等
/// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号控等
/// </summary>
public const string AmmeterSubscriberWorkerOtherIssuedEventName = "issued.other.ammeter.event";
public const string AmmeterSubscriberWorkerOtherIssuedEventName = "issued.auto.other.ammeter.event";
/// <summary>
/// 电表自动阀控
/// </summary>
public const string AmmeterSubscriberWorkerAutoValveControlIssuedEventName = "issued.auto.control.ammeter.event";
/// <summary>
/// 电表手动阀控
/// </summary>
public const string AmmeterSubscriberWorkerManualValveControlIssuedEventName = "issued.control.ammeter.event";
public const string AmmeterSubscriberWorkerManualValveControlIssuedEventName = "issued.manual.control.ammeter.event";
/// <summary>
/// 电表手动抄读
/// </summary>
public const string AmmeterSubscriberWorkerManualValveReadingIssuedEventName = "issued.manual.reading.ammeter.event";
#endregion
#region
/// <summary>
/// 1分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.one.watermeter.event";
public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.auto.one.watermeter.event";
/// <summary>
/// 5分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.five.watermeter.event";
public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.auto.five.watermeter.event";
/// <summary>
/// 15分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.watermeter.event";
public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.auto.fifteen.watermeter.event";
/// <summary>
/// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、定时阀控
/// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号
/// </summary>
public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.other.watermeter.event";
public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.auto.other.watermeter.event";
/// <summary>
/// 水表自动阀控
/// </summary>
public const string WatermeterSubscriberWorkerAutoValveControlIssuedEventName = "issued.auto.control.watermeter.event";
/// <summary>
/// 水表手动阀控
/// </summary>
public const string WatermeterSubscriberWorkerManualValveControlIssuedEventName = "issued.control.watermeter.event";
public const string WatermeterSubscriberWorkerManualValveControlIssuedEventName = "issued.manual.control.watermeter.event";
/// <summary>
/// 水表手动抄读
/// </summary>
public const string WatermeterSubscriberWorkerManualValveReadingIssuedEventName = "issued.manual.reading.watermeter.event";
#endregion