优化消息存储

This commit is contained in:
陈益 2025-03-18 22:43:24 +08:00
parent 7f56b6e91f
commit 76fe43ae54
11 changed files with 354 additions and 104 deletions

View File

@ -18,6 +18,7 @@ using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Workers;
@ -36,17 +37,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _capBus;
private readonly IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> _meterReadingIssuedRepository;
private readonly IRepository<MeterFifteenMinuteReadingRecords, Guid> _fifteenMinuteReadingRecordRepository;
private readonly IRepository<MeterFiveMinuteReadingRecords, Guid> _fiveMinuteReadingRecordRepository;
private readonly IRepository<MeterOneMinuteReadingRecords, Guid> _oneMinuteReadingRecordRepository;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher capBus,
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository)
IRepository<MeterFifteenMinuteReadingRecords, Guid> fifteenMinuteReadingRecordRepository, IRepository<MeterFiveMinuteReadingRecords, Guid> fiveMinuteReadingRecordRepository, IRepository<MeterOneMinuteReadingRecords, Guid> oneMinuteReadingRecordRepository)
{
_capBus = capBus;
_logger = logger;
_meterReadingIssuedRepository = meterReadingIssuedRepository;
_oneMinuteReadingRecordRepository = oneMinuteReadingRecordRepository;
_fiveMinuteReadingRecordRepository = fiveMinuteReadingRecordRepository;
_fifteenMinuteReadingRecordRepository = fifteenMinuteReadingRecordRepository;
}
/// <summary>
@ -186,8 +191,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据错误
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
//每次缓存时,删除缓存,避免缓存数据错误
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else
//每次缓存时,删除缓存,避免缓存数据错误
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
@ -274,29 +279,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
Dictionary<string, Dictionary<string, MeterOneMinuteReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterOneMinuteReadingRecords>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
List<MeterOneMinuteReadingRecords> meterTaskInfosList = new List<MeterOneMinuteReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value);
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
@ -320,37 +336,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息
int timeDensity = 5;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
Dictionary<string, Dictionary<string, MeterFiveMinuteReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterFiveMinuteReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
List<MeterFiveMinuteReadingRecords> meterTaskInfosList = new List<MeterFiveMinuteReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value);
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
//缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
@ -376,40 +403,47 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息
int timeDensity = 15;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
Dictionary<string, Dictionary<string, MeterFifteenMinuteReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterFifteenMinuteReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
List<MeterFifteenMinuteReadingRecords> meterTaskInfosList = new List<MeterFifteenMinuteReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
//todo 可能需要优化如果使用等待会很慢但使用不等待mongodb 连接池又没法抗住,先发送微妙级的延时队列消息,暂时先这样处理
_= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value);
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
//缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
@ -420,7 +454,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
stopwatch.Stop();
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
@ -586,7 +620,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
Dictionary<string, ScheduledMeterReadingIssuedEventMessage> keyValuePairs = new Dictionary<string, ScheduledMeterReadingIssuedEventMessage>();
Dictionary<string, BasicMeterReadingRecords> keyValuePairs = new Dictionary<string, BasicMeterReadingRecords>();
foreach (var tempItem in tempCodes)
{
@ -639,16 +673,24 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
var evenMessageInfo = new ScheduledMeterReadingIssuedEventMessage
var meterReadingRecords = new BasicMeterReadingRecords()
{
MeterAddress = ammeter.AmmerterAddress,
MeterId = ammeter.ID,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeter.FocusAddress,
FocusID = ammeter.FocusID,
AFN = aFN,
Fn = fn,
Pn = ammeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
DeviceNo = ammeter.FocusAddress,
MessageId = NewId.NextGuid().ToString(),
TimeDensity = timeDensity,
WasSuccessful = false,
CreationTime = currentTime,
};
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", evenMessageInfo);
meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
@ -742,29 +784,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
Dictionary<string, Dictionary<string, MeterOneMinuteReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterOneMinuteReadingRecords>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
List<MeterOneMinuteReadingRecords> meterTaskInfosList = new List<MeterOneMinuteReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value);
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
@ -789,37 +842,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息
int timeDensity = 5;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
Dictionary<string, Dictionary<string, MeterFiveMinuteReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterFiveMinuteReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
List<MeterFiveMinuteReadingRecords> meterTaskInfosList = new List<MeterFiveMinuteReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value);
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
//缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
@ -829,7 +893,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
}
@ -843,37 +907,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息
int timeDensity = 15;
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, ScheduledMeterReadingIssuedEventMessage>> meterTaskInfos = await GetMeterRedisCacheData<ScheduledMeterReadingIssuedEventMessage>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
Dictionary<string, Dictionary<string, MeterFifteenMinuteReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterFifteenMinuteReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
List<ScheduledMeterReadingIssuedEventMessage> meterTaskInfosList = new List<ScheduledMeterReadingIssuedEventMessage>();
List<MeterFifteenMinuteReadingRecords> meterTaskInfosList = new List<MeterFifteenMinuteReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value);
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList);
await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
//缓存下一个时间的任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
@ -953,7 +1028,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return meterInfos;
}
/// <summary>
/// 指定时间对比当前时间
/// </summary>

View File

@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging;
@ -24,8 +25,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher capBus,
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository) :base(logger, capBus, meterReadingIssuedRepository)
ICapPublisher capBus,
IRepository<MeterFifteenMinuteReadingRecords, Guid> fifteenMinuteReadingRecordRepository, IRepository<MeterFiveMinuteReadingRecords, Guid> fiveMinuteReadingRecordRepository, IRepository<MeterOneMinuteReadingRecords, Guid> oneMinuteReadingRecordRepository) :base(logger, capBus, fifteenMinuteReadingRecordRepository, fiveMinuteReadingRecordRepository, oneMinuteReadingRecordRepository)
{
}

View File

@ -67,10 +67,10 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
@ -94,10 +94,10 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
@ -121,10 +121,10 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
@ -150,10 +150,10 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
@ -177,10 +177,10 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
@ -204,10 +204,10 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Enums
{
/// <summary>
/// 数据迁移状态
/// </summary>
public enum RecordsDataMigrationStatusEnum
{
/// <summary>
/// 未开始
/// </summary>
NotStarted = 0,
/// <summary>
/// 进行中
/// </summary>
InProgress = 1,
/// <summary>
/// 已完成
/// </summary>
Completed = 2,
/// <summary>
/// 已取消
/// </summary>
Cancelled = 3,
/// <summary>
/// 失败
/// </summary>
Failed = 4,
}
}

View File

@ -12,12 +12,12 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
/// <summary>
/// 下发消息内容
/// </summary>
public string IssuedMessageHexString { get; set; }
public string MessageHexString { get; set; }
/// <summary>
/// 集中器编号
/// </summary>
public string DeviceNo { get; set; }
public string FocusAddress { get; set; }
/// <summary>
/// 采集时间间隔通过Kafka主题区分(分钟如15)
@ -29,25 +29,5 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
/// </summary>
public string MessageId { get; set; }
/// <summary>
/// 是否下发成功
/// </summary>
public bool WasSuccessful { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreationTime { get; set; }
/// <summary>
/// 消息上报内容
/// </summary>
public string? ReceivedMessageHexString { get; set; }
/// <summary>
/// 消息上报时间
/// </summary>
public DateTime? ReceivedTime { get; set; }
}
}

View File

@ -1,16 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems
{
/// <summary>
/// 抄读记录表,包含下发报文和回复报文,以及是否迁移
/// </summary>
public class MeterReadingRecords : AggregateRoot<Guid>
{
}
}

View File

@ -0,0 +1,114 @@
using JiShe.CollectBus.Common.Enums;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 抄读记录基类
/// </summary>
public class BasicMeterReadingRecords : AggregateRoot<Guid>
{
/// <summary>
/// 是否手动操作
/// </summary>
public bool ManualOrNot { get; set; }
/// <summary>
/// 下发消息内容
/// </summary>
public string IssuedMessageHexString { get; set; }
/// <summary>
/// 下发消息Id
/// </summary>
public string IssuedMessageId { get; set; }
/// <summary>
/// 集中器ID
/// </summary>
public int FocusID { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set; }
/// <summary>
/// 表Id
/// </summary>
public int MeterId { get; set; }
/// <summary>
/// 表地址
/// </summary>
public string MeterAddress { get; set; }
/// <summary>
/// 表类型
/// </summary>
public MeterTypeEnum MeterType { get; set; }
/// <summary>
/// AFN功能码
/// </summary>
public AFN AFN { get; set; }
/// <summary>
/// 抄读功能码
/// </summary>
public int Fn { get; set; }
/// <summary>
/// 抄读计量点
/// </summary>
public int Pn { get; set; }
/// <summary>
/// 是否下发成功
/// </summary>
public bool WasSuccessful { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreationTime { get; set; }
/// <summary>
/// 消息上报内容
/// </summary>
public string? ReceivedMessageHexString { get; set; }
/// <summary>
/// 消息上报时间
/// </summary>
public DateTime? ReceivedTime { get; set; }
/// <summary>
/// 上报消息Id
/// </summary>
public string ReceivedMessageId { get; set; }
/// <summary>
/// 数据迁移状态
/// </summary>
public RecordsDataMigrationStatusEnum MigrationStatus { get; set; }
/// <summary>
/// 数据迁移时间
/// </summary>
public DateTime? MigrationTime { get; set; }
public void CreateDataId(Guid Id)
{
this.Id = Id;
}
}
}

View File

@ -0,0 +1,18 @@
using JiShe.CollectBus.Common.Enums;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 15分钟抄读记录表包含下发报文和回复报文以及是否迁移
/// </summary>
public class MeterFifteenMinuteReadingRecords : BasicMeterReadingRecords
{
}
}

View File

@ -0,0 +1,18 @@
using JiShe.CollectBus.Common.Enums;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 5分钟抄读记录表包含下发报文和回复报文以及是否迁移
/// </summary>
public class MeterFiveMinuteReadingRecords : BasicMeterReadingRecords
{
}
}

View File

@ -0,0 +1,18 @@
using JiShe.CollectBus.Common.Enums;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 1分钟抄读记录表包含下发报文和回复报文以及是否迁移
/// </summary>
public class MeterOneMinuteReadingRecords : BasicMeterReadingRecords
{
}
}

View File

@ -1,6 +1,7 @@
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Protocols;
using MongoDB.Driver;
using Volo.Abp.Data;
@ -23,7 +24,9 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
public IMongoCollection<Device> Devices => Collection<Device>();
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
public IMongoCollection<ScheduledMeterReadingIssuedEventMessage> MeterReadingIssued => Collection<ScheduledMeterReadingIssuedEventMessage>();
public IMongoCollection<MeterFifteenMinuteReadingRecords> FifteenMinuteReadingRecords => Collection<MeterFifteenMinuteReadingRecords>();
public IMongoCollection<MeterFiveMinuteReadingRecords> MeterFiveMinuteReadingRecords => Collection<MeterFiveMinuteReadingRecords>();
public IMongoCollection<MeterOneMinuteReadingRecords> MeterOneMinuteReadingRecords => Collection<MeterOneMinuteReadingRecords>();
protected override void CreateModel(IMongoModelBuilder modelBuilder)
{