diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 5c766da..2f144de 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -18,6 +18,7 @@ using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Workers; @@ -36,17 +37,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading { private readonly ILogger _logger; private readonly ICapPublisher _capBus; - private readonly IRepository _meterReadingIssuedRepository; + private readonly IRepository _fifteenMinuteReadingRecordRepository; + private readonly IRepository _fiveMinuteReadingRecordRepository; + private readonly IRepository _oneMinuteReadingRecordRepository; public BasicScheduledMeterReadingService( ILogger logger, ICapPublisher capBus, - IRepository meterReadingIssuedRepository) + IRepository fifteenMinuteReadingRecordRepository, IRepository fiveMinuteReadingRecordRepository, IRepository oneMinuteReadingRecordRepository) { _capBus = capBus; _logger = logger; - _meterReadingIssuedRepository = meterReadingIssuedRepository; + _oneMinuteReadingRecordRepository = oneMinuteReadingRecordRepository; + _fiveMinuteReadingRecordRepository = fiveMinuteReadingRecordRepository; + _fifteenMinuteReadingRecordRepository = fifteenMinuteReadingRecordRepository; } /// @@ -186,8 +191,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; #if DEBUG - //每次缓存时,删除缓存,避免缓存数据错误 - await FreeRedisProvider.Instance.DelAsync(redisCacheKey); + //每次缓存时,删除缓存,避免缓存数据错误 + await FreeRedisProvider.Instance.DelAsync(redisCacheKey); #else //每次缓存时,删除缓存,避免缓存数据错误 await FreeRedisProvider.Instance.DelAsync(redisCacheKey); @@ -274,29 +279,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) { foreach (var ammerterItem in focusItem.Value) { - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value); + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); + await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } + //删除任务数据 + await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); + //缓存下一个时间的任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { @@ -320,37 +336,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 5; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); return; } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) { foreach (var ammerterItem in focusItem.Value) { - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value); + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); + await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } + //删除任务数据 + await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); + //缓存下一个时间的任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { @@ -376,40 +403,47 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 15; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); return; } - + //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) { foreach (var ammerterItem in focusItem.Value) { - //todo 可能需要优化,如果使用等待,会很慢,但使用不等待,mongodb 连接池又没法抗住,先发送微妙级的延时队列消息,暂时先这样处理 - _= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value); + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); + await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 - await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); + await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); //缓存下一个时间的任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() @@ -420,7 +454,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity); await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); - + stopwatch.Stop(); _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); @@ -586,7 +620,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } - Dictionary keyValuePairs = new Dictionary(); + Dictionary keyValuePairs = new Dictionary(); foreach (var tempItem in tempCodes) { @@ -639,16 +673,24 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var evenMessageInfo = new ScheduledMeterReadingIssuedEventMessage + + + var meterReadingRecords = new BasicMeterReadingRecords() { + MeterAddress = ammeter.AmmerterAddress, + MeterId = ammeter.ID, + MeterType = MeterTypeEnum.Ammeter, + FocusAddress = ammeter.FocusAddress, + FocusID = ammeter.FocusID, + AFN = aFN, + Fn = fn, + Pn = ammeter.MeteringCode, + IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageHexString = Convert.ToHexString(dataInfos), - DeviceNo = ammeter.FocusAddress, - MessageId = NewId.NextGuid().ToString(), - TimeDensity = timeDensity, - WasSuccessful = false, - CreationTime = currentTime, }; - keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", evenMessageInfo); + meterReadingRecords.CreateDataId(GuidGenerator.Create()); + + keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords); } await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } @@ -742,29 +784,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) { foreach (var ammerterItem in focusItem.Value) { - await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName, ammerterItem.Value); + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); + await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } + //删除任务数据 + await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); + //缓存下一个时间的任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { @@ -789,37 +842,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 5; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); return; } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) { foreach (var ammerterItem in focusItem.Value) { - await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName, ammerterItem.Value); + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); + await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } + //删除任务数据 + await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); + //缓存下一个时间的任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { @@ -829,7 +893,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity); await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); - + _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); } @@ -843,37 +907,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 15; var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); return; } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) { foreach (var ammerterItem in focusItem.Value) { - await _capBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName, ammerterItem.Value); + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingIssuedRepository.InsertManyAsync(meterTaskInfosList); + await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } + //删除任务数据 + await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); + //缓存下一个时间的任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { @@ -953,7 +1028,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading return meterInfos; } - + /// /// 指定时间对比当前时间 /// diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 872a590..5a26013 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IotSystems.MessageIssueds; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.Logging; @@ -24,8 +25,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading { public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher capBus, - IRepository meterReadingIssuedRepository) :base(logger, capBus, meterReadingIssuedRepository) + ICapPublisher capBus, + IRepository fifteenMinuteReadingRecordRepository, IRepository fiveMinuteReadingRecordRepository, IRepository oneMinuteReadingRecordRepository) :base(logger, capBus, fifteenMinuteReadingRecordRepository, fiveMinuteReadingRecordRepository, oneMinuteReadingRecordRepository) { } diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 7a98a09..62748be 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -67,10 +67,10 @@ namespace JiShe.CollectBus.Subscribers } else { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); + var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); if (device != null) { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); + await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); } } @@ -94,10 +94,10 @@ namespace JiShe.CollectBus.Subscribers } else { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); + var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); if (device != null) { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); + await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); } } @@ -121,10 +121,10 @@ namespace JiShe.CollectBus.Subscribers } else { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); + var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); if (device != null) { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); + await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); } } @@ -150,10 +150,10 @@ namespace JiShe.CollectBus.Subscribers } else { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); + var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); if (device != null) { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); + await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); } } @@ -177,10 +177,10 @@ namespace JiShe.CollectBus.Subscribers } else { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); + var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); if (device != null) { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); + await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); } } @@ -204,10 +204,10 @@ namespace JiShe.CollectBus.Subscribers } else { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); + var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); if (device != null) { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); + await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); } } } diff --git a/src/JiShe.CollectBus.Common/Enums/RecordsDataMigrationStatusEnum.cs b/src/JiShe.CollectBus.Common/Enums/RecordsDataMigrationStatusEnum.cs new file mode 100644 index 0000000..544be42 --- /dev/null +++ b/src/JiShe.CollectBus.Common/Enums/RecordsDataMigrationStatusEnum.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Enums +{ + /// + /// 数据迁移状态 + /// + public enum RecordsDataMigrationStatusEnum + { + /// + /// 未开始 + /// + NotStarted = 0, + + /// + /// 进行中 + /// + InProgress = 1, + + /// + /// 已完成 + /// + Completed = 2, + + /// + /// 已取消 + /// + Cancelled = 3, + + /// + /// 失败 + /// + Failed = 4, + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/ScheduledMeterReadingIssuedEventMessage.cs b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/ScheduledMeterReadingIssuedEventMessage.cs index 9bf229e..49fce87 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/ScheduledMeterReadingIssuedEventMessage.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/ScheduledMeterReadingIssuedEventMessage.cs @@ -12,12 +12,12 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds /// /// 下发消息内容 /// - public string IssuedMessageHexString { get; set; } + public string MessageHexString { get; set; } /// /// 集中器编号 /// - public string DeviceNo { get; set; } + public string FocusAddress { get; set; } /// /// 采集时间间隔,通过Kafka主题区分(分钟,如15) @@ -29,25 +29,5 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds /// public string MessageId { get; set; } - /// - /// 是否下发成功 - /// - public bool WasSuccessful { get; set; } - - /// - /// 创建时间 - /// - public DateTime CreationTime { get; set; } - - /// - /// 消息上报内容 - /// - public string? ReceivedMessageHexString { get; set; } - - /// - /// 消息上报时间 - /// - public DateTime? ReceivedTime { get; set; } - } } diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords.cs deleted file mode 100644 index 4e8c48c..0000000 --- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.Domain.Entities; - -namespace JiShe.CollectBus.IotSystems -{ - /// - /// 抄读记录表,包含下发报文和回复报文,以及是否迁移 - /// - public class MeterReadingRecords : AggregateRoot - { - } -} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs new file mode 100644 index 0000000..315bbda --- /dev/null +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs @@ -0,0 +1,114 @@ +using JiShe.CollectBus.Common.Enums; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; + +namespace JiShe.CollectBus.IotSystems.MeterReadingRecords +{ + /// + /// 抄读记录基类 + /// + public class BasicMeterReadingRecords : AggregateRoot + { + + /// + /// 是否手动操作 + /// + public bool ManualOrNot { get; set; } + + /// + /// 下发消息内容 + /// + public string IssuedMessageHexString { get; set; } + + /// + /// 下发消息Id + /// + public string IssuedMessageId { get; set; } + + /// + /// 集中器ID + /// + public int FocusID { get; set; } + + /// + /// 集中器地址 + /// + public string FocusAddress { get; set; } + + /// + /// 表Id + /// + public int MeterId { get; set; } + + /// + /// 表地址 + /// + public string MeterAddress { get; set; } + + + /// + /// 表类型 + /// + public MeterTypeEnum MeterType { get; set; } + + /// + /// AFN功能码 + /// + public AFN AFN { get; set; } + + /// + /// 抄读功能码 + /// + public int Fn { get; set; } + + /// + /// 抄读计量点 + /// + public int Pn { get; set; } + + + /// + /// 是否下发成功 + /// + public bool WasSuccessful { get; set; } + + /// + /// 创建时间 + /// + public DateTime CreationTime { get; set; } + + /// + /// 消息上报内容 + /// + public string? ReceivedMessageHexString { get; set; } + + /// + /// 消息上报时间 + /// + public DateTime? ReceivedTime { get; set; } + + /// + /// 上报消息Id + /// + public string ReceivedMessageId { get; set; } + + /// + /// 数据迁移状态 + /// + public RecordsDataMigrationStatusEnum MigrationStatus { get; set; } + + /// + /// 数据迁移时间 + /// + public DateTime? MigrationTime { get; set; } + + public void CreateDataId(Guid Id) + { + this.Id = Id; + } + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs new file mode 100644 index 0000000..acc7163 --- /dev/null +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs @@ -0,0 +1,18 @@ +using JiShe.CollectBus.Common.Enums; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; + +namespace JiShe.CollectBus.IotSystems.MeterReadingRecords +{ + /// + /// 15分钟抄读记录表,包含下发报文和回复报文,以及是否迁移 + /// + public class MeterFifteenMinuteReadingRecords : BasicMeterReadingRecords + { + + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs new file mode 100644 index 0000000..eeb69a2 --- /dev/null +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs @@ -0,0 +1,18 @@ +using JiShe.CollectBus.Common.Enums; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; + +namespace JiShe.CollectBus.IotSystems.MeterReadingRecords +{ + /// + /// 5分钟抄读记录表,包含下发报文和回复报文,以及是否迁移 + /// + public class MeterFiveMinuteReadingRecords : BasicMeterReadingRecords + { + + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs new file mode 100644 index 0000000..35b7630 --- /dev/null +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs @@ -0,0 +1,18 @@ +using JiShe.CollectBus.Common.Enums; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; + +namespace JiShe.CollectBus.IotSystems.MeterReadingRecords +{ + /// + /// 1分钟抄读记录表,包含下发报文和回复报文,以及是否迁移 + /// + public class MeterOneMinuteReadingRecords : BasicMeterReadingRecords + { + + } +} diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs index 06dfd1a..5c73029 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Protocols; using MongoDB.Driver; using Volo.Abp.Data; @@ -23,7 +24,9 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon public IMongoCollection Devices => Collection(); public IMongoCollection ProtocolInfos => Collection(); - public IMongoCollection MeterReadingIssued => Collection(); + public IMongoCollection FifteenMinuteReadingRecords => Collection(); + public IMongoCollection MeterFiveMinuteReadingRecords => Collection(); + public IMongoCollection MeterOneMinuteReadingRecords => Collection(); protected override void CreateModel(IMongoModelBuilder modelBuilder) {