diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
index 4814c6a..a3fb126 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
@@ -59,5 +59,10 @@ namespace JiShe.CollectBus.Kafka.Internal
///
public string? SaslPassword { get; set; }
+ ///
+ /// 首次采集时间
+ ///
+ public DateTime FirstCollectionTime { get; set; }
+
}
}
diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
index b61514e..37632e7 100644
--- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
+++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
@@ -65,7 +65,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService();
- //await dbContext.InitAmmeterCacheData();
+ await dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData();
//初始化主题信息
diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index 53db99a..e5a7b81 100644
--- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -26,7 +26,11 @@ using System.Diagnostics;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
+<<<<<<< HEAD
using JiShe.CollectBus.Kafka.Internal;
+=======
+using JiShe.CollectBus.Common.Extensions;
+>>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172
namespace JiShe.CollectBus.Samples;
@@ -243,6 +247,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
}
+ ///
+ /// 下一个采集时间点验证
+ ///
+ ///
+ [HttpGet]
+ public async Task TestCalculateNextCollectionTime(string time, int timeDensity)
+ {
+ DateTime nextTaskTime = Convert.ToDateTime(time);
+
+ return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity));
+ }
+
+
public Task GetAsync()
{
return Task.FromResult(
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 4285bf5..1ab07bc 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -1,4 +1,5 @@
-using JiShe.CollectBus.Ammeters;
+using DnsClient.Protocol;
+using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
@@ -8,6 +9,7 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem;
+using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
@@ -24,9 +26,12 @@ using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-using JiShe.CollectBus.IoTDB.Interface;
using static FreeSql.Internal.GlobalFilter;
+<<<<<<< HEAD
using JiShe.CollectBus.Kafka.Internal;
+=======
+using static System.Runtime.InteropServices.JavaScript.JSType;
+>>>>>>> fe51402b1f105dc536f08ec2249d4ab13fa8f172
namespace JiShe.CollectBus.ScheduledMeterReading
{
@@ -101,6 +106,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return;
}
+ var currentTime = DateTime.Now;
+
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item);
@@ -130,70 +137,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
- var timer = Stopwatch.StartNew();
+ //_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}");
- //获取对应频率中的所有电表信息
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
-
- List meterInfos = new List();
- decimal? cursor = null;
- string member = null;
- bool hasNext;
- do
- {
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- pageSize: 1000,
- lastScore: cursor,
- lastMember: member);
-
- meterInfos.AddRange(page.Items);
- cursor = page.HasNext ? page.NextScore : null;
- member = page.HasNext ? page.NextMember : null;
- hasNext = page.HasNext;
- } while (hasNext);
-
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- timer.Stop();
- _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
- return;
- }
- //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
-
-
- //处理数据
- //await DeviceGroupBalanceControl.ProcessGenericListAsync(
- // items: meterInfos,
- // deviceIdSelector: data => data.FocusAddress,
- // processor: (data, groupIndex) =>
- // {
- // _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
- // }
- //);
-
-
-
- await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
- items: meterInfos,
- deviceIdSelector: data => data.FocusAddress,
- processor: (data, groupIndex) =>
+ _ = CreateMeterPublishTask(
+ timeDensity: timeDensity,
+ taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
+ meterType: MeterTypeEnum.Ammeter,
+ taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
{
- AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
- }
- );
-
- timer.Stop();
- _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
+ AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
+ });
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
//todo 水表任务创建待处理
//await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos);
+
+ _ = CreateMeterPublishTask(
+ timeDensity: timeDensity,
+ taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
+ meterType: MeterTypeEnum.Ammeter,
+ taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
+ {
+ //AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
+ });
}
else
{
@@ -205,7 +173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
- tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity);
+ tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
}
@@ -229,24 +197,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
-#if DEBUG
- //var timeDensity = "15";
- //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
- ////获取缓存中的电表信息
- //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
-
- //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
- ////List focusAddressDataLista = new List();
- //List meterInfos = new List();
- //foreach (var item in tempMeterInfos)
- //{
- // var tempData = item.Adapt();
- // tempData.FocusId = item.FocusID;
- // tempData.MeterId = item.Id;
- // meterInfos.Add(tempData);
- // //focusAddressDataLista.Add(item.FocusAddress);
- //}
+#if DEBUG
+ return;
@@ -258,23 +210,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
List meterInfos = new List();
List focusAddressDataLista = new List();
var timer1 = Stopwatch.StartNew();
- //decimal? cursor = null;
- //string member = null;
- //bool hasNext;
- //do
- //{
- // var page = await _redisDataCacheService.GetAllPagedDataOptimized(
- // redisCacheMeterInfoHashKeyTemp,
- // redisCacheMeterInfoZSetScoresIndexKeyTemp,
- // pageSize: 1000,
- // lastScore: cursor,
- // lastMember: member);
-
- // meterInfos.AddRange(page.Items);
- // cursor = page.HasNext ? page.NextScore : null;
- // member = page.HasNext ? page.NextMember : null;
- // hasNext = page.HasNext;
- //} while (hasNext);
var allIds = new HashSet();
decimal? score = null;
@@ -306,8 +241,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
- DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
- return;
+ //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
+ //return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
@@ -329,6 +264,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
+
+ //先处理采集频率任务缓存
+ foreach (var item in meterInfoGroupByTimeDensity)
+ {
+ TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
+ {
+ TimeDensity = item.Key,
+ NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
+ };
+
+ //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
+
+ var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
+ await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
+ }
+
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
@@ -403,25 +354,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
ammeterInfos.Add(ammeter);
- //keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
}
- //await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
await _redisDataCacheService.BatchInsertDataAsync(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
-
- //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = itemTimeDensity.Key,
- NextTaskTime = DateTime.Now.AddMinutes(1)
- };
-
- var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
- await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
//初始化设备组负载控制
@@ -446,63 +385,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
+
//获取缓存中的电表信息
- int timeDensity = 1;
+ int timeDensity = 5;
var currentTime = DateTime.Now;
- var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
- var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ // 自动计算最佳并发度
+ int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
+
+ var options = new ParallelOptions
{
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
- return;
- }
+ MaxDegreeOfParallelism = recommendedThreads,
+ };
+ var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
- //获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
- if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
+ Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
- return;
- }
+ var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- List meterTaskInfosList = new List();
+ _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
+ });
- //将取出的缓存任务数据发送到Kafka消息队列中
- foreach (var focusItem in meterTaskInfos)
- {
- foreach (var ammerterItem in focusItem.Value)
- {
- var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
- {
- MessageHexString = ammerterItem.Value.IssuedMessageHexString,
- MessageId = ammerterItem.Value.IssuedMessageId,
- FocusAddress = ammerterItem.Value.FocusAddress,
- TimeDensity = timeDensity.ToString(),
- };
- //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
-
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
- //_= _producerBus.Publish(tempMsg);
-
-
- meterTaskInfosList.Add(ammerterItem.Value);
- }
- }
- if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
- {
- //_dbProvider.SwitchSessionPool(true);
- //await _dbProvider.InsertAsync(meterTaskInfosList);
-
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
- }
-
- ////删除任务数据
- //await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
- //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
-
-
- _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
+ await Task.CompletedTask;
}
@@ -516,57 +421,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 5;
var currentTime = DateTime.Now;
- var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
- var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
+ // 自动计算最佳并发度
+ int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
+
+ var options = new ParallelOptions
{
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
- return;
- }
+ MaxDegreeOfParallelism = recommendedThreads,
+ };
+ var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
- //获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
- if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
+ Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
- return;
- }
+ var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- List meterTaskInfosList = new List();
-
- //将取出的缓存任务数据发送到Kafka消息队列中
- foreach (var focusItem in meterTaskInfos)
- {
- foreach (var ammerterItem in focusItem.Value)
- {
- var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
- {
- MessageHexString = ammerterItem.Value.IssuedMessageHexString,
- MessageId = ammerterItem.Value.IssuedMessageId,
- FocusAddress = ammerterItem.Value.FocusAddress,
- TimeDensity = timeDensity.ToString(),
- };
- //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
-
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
-
- //_ = _producerBus.Publish(tempMsg);
-
- meterTaskInfosList.Add(ammerterItem.Value);
- }
- }
- if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
- {
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
- }
-
- ////删除任务数据
- //await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
-
- ////缓存下一个时间的任务
- //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
-
- _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
+ _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
+ });
}
///
@@ -575,12 +445,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.Start();
-
//获取缓存中的电表信息
int timeDensity = 15;
- var currentDateTime = DateTime.Now;
+ var currentTime = DateTime.Now;
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
@@ -589,107 +456,84 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
MaxDegreeOfParallelism = recommendedThreads,
};
- string taskBatch = "20250417155016";
+ var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
+
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
- Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- List meterInfos = new List();
- decimal? cursor = null;
- string member = null;
- bool hasNext;
- do
- {
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheTelemetryPacketInfoHashKey,
- redisCacheTelemetryPacketInfoZSetScoresIndexKey,
- pageSize: 1000,
- lastScore: cursor,
- lastMember: member);
-
- meterInfos.AddRange(page.Items);
- cursor = page.HasNext ? page.NextScore : null;
- member = page.HasNext ? page.NextMember : null;
- hasNext = page.HasNext;
-
- await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
- items: meterInfos,
- deviceIdSelector: data => data.FocusAddress,
- processor: (data, groupIndex) =>
- {
- _= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
- }
- );
-
- } while (hasNext);
+ _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
});
-
-
- //var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
- //var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- //if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
- //{
- // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
- // return;
- //}
-
- ////获取下发任务缓存数据
- //Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
- //if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
- //{
- // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
- // return;
- //}
-
- //List meterTaskInfosList = new List();
-
- ////将取出的缓存任务数据发送到Kafka消息队列中
- //foreach (var focusItem in meterTaskInfos)
- //{
- // foreach (var ammerterItem in focusItem.Value)
- // {
- // var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
- // {
- // MessageHexString = ammerterItem.Value.IssuedMessageHexString,
- // MessageId = ammerterItem.Value.IssuedMessageId,
- // FocusAddress = ammerterItem.Value.FocusAddress,
- // TimeDensity = timeDensity.ToString(),
- // };
- // //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
-
- // _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
-
- // //_ = _producerBus.Publish(tempMsg);
-
- // meterTaskInfosList.Add(ammerterItem.Value);
- // }
- //}
- //if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
- //{
- // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
- //}
-
-
- //stopwatch.Stop();
-
- //_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
-
///
- /// 电表创建发布任务
+ /// 创建电表待发送的任务数据
///
/// 采集频率
- /// 集中器号hash分组的集中器集合数据
+ /// 时间格式的任务批次名称
+ ///
+ private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch)
+ {
+ var timer = Stopwatch.StartNew();
+
+ //获取对应频率中的所有电表信息
+ var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+
+ List meterInfos = new List();
+ decimal? cursor = null;
+ string member = null;
+ bool hasNext;
+ do
+ {
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheMeterInfoHashKeyTemp,
+ redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
+
+ meterInfos.AddRange(page.Items);
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
+ hasNext = page.HasNext;
+ } while (hasNext);
+
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ timer.Stop();
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
+ return;
+ }
+
+
+ await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
+ items: meterInfos,
+ deviceIdSelector: data => data.FocusAddress,
+ processor: (data, groupIndex) =>
+ {
+ AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
+ }
+ );
+
+ timer.Stop();
+ _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
+ }
+
+
+ ///
+ /// 创建电表待发送的任务数据
+ ///
+ /// 采集频率
+ /// 电表信息
/// 集中器所在分组
/// 时间格式的任务批次名称
///
- private void AmmerterCreatePublishTask(int timeDensity
+ private void AmmerterCreatePublishTaskAction(int timeDensity
, AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
@@ -697,7 +541,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
-
+
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
@@ -781,7 +625,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
- //Dictionary keyValuePairs = new Dictionary();
List taskList = new List();
foreach (var tempItem in tempCodes)
@@ -801,11 +644,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
- byte[] dataInfos = null;
+ TelemetryPacketResponse builderResponse = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据)
{
//实时数据
- dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn);
+ builderResponse = TelemetryPacketBuilder.AFN0C_Fn_Send(new TelemetryPacketRequest()
+ {
+ FocusAddress = ammeterInfo.FocusAddress,
+ Fn = fn,
+ Pn = ammeterInfo.MeteringCode
+ });
}
else
{
@@ -814,7 +662,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
- dataInfos = handler(new TelemetryPacketRequest()
+ builderResponse = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
@@ -829,7 +677,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//TODO:特殊表
- if (dataInfos == null || dataInfos.Length <= 0)
+ if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
@@ -850,36 +698,28 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusId = ammeterInfo.FocusId,
AFN = aFN,
Fn = fn,
+ Seq = builderResponse.Seq,
+ MSA = builderResponse.MSA,
ItemCode = tempItem,
- TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode),
+ TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA),
+ IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
- IssuedMessageHexString = Convert.ToHexString(dataInfos),
+ IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
+ IsReceived = false,
};
- //meterReadingRecords.CreateDataId(GuidGenerator.Create());
-
taskList.Add(meterReadingRecords);
}
- //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
- //await Task.Delay(timeSpan);
- //return keyValuePairs;
- // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
-
- //using (var pipe = FreeRedisProvider.Instance.StartPipe())
- //{
- // pipe.HSet(redisCacheKey, keyValuePairs);
- // object[] ret = pipe.EndPipe();
- //}
if (taskList == null
|| taskList.Count() <= 0
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
return;
}
@@ -906,76 +746,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// taskList);
}
- ///
- /// Kafka 推送消息
- ///
- /// 主题名称
- /// 任务记录
- /// 对应分区,也就是集中器号所在的分组序号
- ///
- private async Task KafkaProducerIssuedMessage(string topicName,
- MeterReadingTelemetryPacketInfo taskRecord,int partition)
- {
- if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
- {
- throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
- }
-
- await _producerService.ProduceAsync(topicName, partition, taskRecord);
- }
-
- private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
- {
- var currentDateTime = DateTime.Now;
-
- var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType);
-
- //FreeRedisProvider.Instance.key()
-
- var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
- return;
- }
-
- //获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType);
- if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- List meterTaskInfosList = new List();
-
- //将取出的缓存任务数据发送到Kafka消息队列中
- foreach (var focusItem in meterTaskInfos)
- {
- foreach (var ammerterItem in focusItem.Value)
- {
- var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
- {
- MessageHexString = ammerterItem.Value.IssuedMessageHexString,
- MessageId = ammerterItem.Value.IssuedMessageId,
- FocusAddress = ammerterItem.Value.FocusAddress,
- TimeDensity = timeDensity.ToString(),
- };
- //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
-
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
-
- //_ = _producerBus.Publish(tempMsg);
-
- meterTaskInfosList.Add(ammerterItem.Value);
- }
- }
- if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
- {
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
- }
- }
-
#endregion
@@ -1044,6 +814,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
+
+
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
}
@@ -1109,12 +881,58 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
}
+ ///
+ /// 创建水表待发送的任务数据
+ ///
+ /// 采集频率
+ /// 水表信息
+ /// 集中器所在分组
+ /// 时间格式的任务批次名称
+ ///
+ private void WatermeterCreatePublishTaskAction(int timeDensity
+ , WatermeterInfo meterInfo, int groupIndex, string taskBatch)
+ {
+ var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
+
+
+ var currentTime = DateTime.Now;
+ var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
+
+ var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+
+
+ var taskInfo = new MeterReadingTelemetryPacketInfo()
+ {
+ Seq= null,
+
+ };
+ //
+
+ Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address);
+
+ using (var pipe = FreeRedisProvider.Instance.StartPipe())
+ {
+ // 主数据存储Hash
+ pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize());
+
+ // Set索引缓存
+ pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId);
+
+ // ZSET索引缓存Key
+ pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId);
+
+ pipe.EndPipe();
+ }
+
+ }
+
#endregion
#region 公共处理方法
-
///
/// 判断是否需要生成采集指令
///
@@ -1131,39 +949,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return false;
}
- /////
- ///// 指定时间对比当前时间
- /////
- /////
- /////
- /////
- //private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
- //{
- // if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
- // return false;
- // return true;
- //}
-
- /////
- ///// 缓存下一个时间的任务
- /////
- ///// 采集频率
- ///// 表类型
- /////
- //private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType)
- //{
- // //缓存下一个时间的任务
- // TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- // {
- // TimeDensity = timeDensity,
- // NextTask = DateTime.Now.AddMinutes(timeDensity)
- // };
-
- // var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity);
- // await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
- //}
-
-
///
/// 获取缓存表计下发指令缓存key前缀
///
@@ -1175,6 +960,130 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*";
}
+
+ ///
+ /// 创建表的待发送的任务数据
+ ///
+ /// 采集频率
+ /// 时间格式的任务批次名称
+ /// 表类型
+ /// 具体的创建任务的委托
+ ///
+ private async Task CreateMeterPublishTask(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel
+ {
+ var timer = Stopwatch.StartNew();
+
+ //获取对应频率中的所有电表信息
+ var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}";
+ var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
+ var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
+
+ List meterInfos = new List();
+ decimal? cursor = null;
+ string member = null;
+ bool hasNext;
+ do
+ {
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheMeterInfoHashKeyTemp,
+ redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
+
+ meterInfos.AddRange(page.Items);
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
+ hasNext = page.HasNext;
+ } while (hasNext);
+
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ timer.Stop();
+ _logger.LogError($"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
+ return;
+ }
+
+
+ await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
+ items: meterInfos,
+ deviceIdSelector: data => data.FocusAddress,
+ processor: (data, groupIndex) =>
+ {
+ taskCreateAction(timeDensity, data, groupIndex, taskBatch);
+ }
+ );
+
+ timer.Stop();
+ _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
+ }
+
+
+ ///
+ /// 创建Kafka消息
+ ///
+ ///
+ ///
+ ///
+ private async Task CreateMeterKafkaTaskMessage(
+ string redisCacheTelemetryPacketInfoHashKey,
+ string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
+ {
+ if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey))
+ {
+ throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101");
+ }
+
+ decimal? cursor = null;
+ string member = null;
+ bool hasNext;
+ var stopwatch = Stopwatch.StartNew();
+ do
+ {
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheTelemetryPacketInfoHashKey,
+ redisCacheTelemetryPacketInfoZSetScoresIndexKey,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
+
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
+ hasNext = page.HasNext;
+
+ await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
+ items: page.Items,
+ deviceIdSelector: data => data.FocusAddress,
+ processor: (data, groupIndex) =>
+ {
+ _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
+ }
+ );
+
+ } while (hasNext);
+
+ stopwatch.Stop();
+ _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
+ }
+
+ ///
+ /// Kafka 推送消息
+ ///
+ /// 主题名称
+ /// 任务记录
+ /// 对应分区,也就是集中器号所在的分组序号
+ ///
+ private async Task KafkaProducerIssuedMessageAction(string topicName,
+ MeterReadingTelemetryPacketInfo taskRecord, int partition)
+ {
+ if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
+ {
+ throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
+ }
+
+ await _producerService.ProduceAsync(topicName, partition, taskRecord);
+ }
+
#endregion
}
diff --git a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs
index b03193a..654abc4 100644
--- a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs
+++ b/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs
@@ -1,4 +1,5 @@
-using System.Threading;
+using System;
+using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.Common.Consts;
@@ -27,13 +28,15 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(CreateToBeIssueTaskWorker);
- CronExpression = "* 10 * * * *";
+ CronExpression = "0 0/1 * * * *";
+ TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
{
+ _logger.LogError($"{DateTime.Now}");
// await _scheduledMeterReadingService.CreateToBeIssueTasks();
}
}
diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs
index 441b22a..005d46b 100644
--- a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs
+++ b/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs
@@ -1,4 +1,5 @@
-using System.Threading;
+using System;
+using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.ScheduledMeterReading;
@@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
- CronExpression = "* 15 * * * *";
+ CronExpression = "0 0/15 * * * *";
+ TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs
index 0a61c63..fbd3668 100644
--- a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs
+++ b/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs
@@ -1,4 +1,5 @@
-using System.Threading;
+using System;
+using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.ScheduledMeterReading;
@@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFiveMinuteWorker);
- CronExpression = "* 5 * * * *";
+ CronExpression = "0 0/5 * * * *";
+ TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs
index 8b7cbfd..e9e0240 100644
--- a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs
+++ b/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs
@@ -1,4 +1,5 @@
-using System.Threading;
+using System;
+using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using JiShe.CollectBus.ScheduledMeterReading;
@@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberOneMinuteWorker);
- CronExpression = "* 1 * * * *";
+ CronExpression = "0 0/1 * * * *";
+ TimeZone = TimeZoneInfo.Local;
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
diff --git a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
index 8b082bb..c07950f 100644
--- a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
+++ b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
@@ -26,12 +26,7 @@ namespace JiShe.CollectBus.Ammeters
/// 电表名称
///
public string Name { get; set; }
-
- ///
- /// 集中器地址
- ///
- public string FocusAddress { get; set; }
-
+
///
/// 集中器地址
///
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
index c3f75d3..3aafa41 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
@@ -34,7 +34,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
///
/// 任务数据唯一标记
///
- public string TaskMark { get; set; }
+ public decimal TaskMark { get; set; }
///
/// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。
@@ -96,7 +96,21 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 采集项编码
///
public string ItemCode { get; set;}
-
+
+ ///
+ /// 帧序列域SEQ
+ ///
+ public required Seq Seq { get; set; }
+
+ ///
+ /// 地址域A3的主站地址MSA
+ ///
+ public int MSA { get; set; }
+
+ ///
+ /// 是否发送
+ ///
+ public bool IsSend { get; set; }
///
/// 创建时间
@@ -132,6 +146,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 上报报文解析备注,异常情况下才有
///
public string ReceivedRemark { get; set; }
+
+ ///
+ /// 是否已上报
+ ///
+ public bool IsReceived { get; set; }
//public void CreateDataId(Guid Id)
//{
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
index 966192b..be97769 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
@@ -1,4 +1,5 @@
-using JiShe.CollectBus.Common.Enums;
+using FreeSql.DataAnnotations;
+using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
@@ -12,7 +13,19 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 水表信息
///
public class WatermeterInfo: DeviceCacheBasicModel
- {
+ {
+ ///
+ /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
+ ///
+ [Column(IsIgnore = true)]
+ public override string MemberId => $"{FocusId}:{MeterId}";
+
+ ///
+ /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
+ ///
+ [Column(IsIgnore = true)]
+ public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
+
///
/// 水表名称
///
@@ -21,11 +34,6 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 表密码
///
public string Password { get; set; }
-
- ///
- /// 集中器地址
- ///
- public string FocusAddress { get; set; }
///
/// 一个集中器下的[MeteringCode]必须唯一。 PN
diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs
index 8ad2a39..5db0dd7 100644
--- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs
+++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs
@@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
///
///
///
- public delegate byte[] AFNDelegate(TelemetryPacketRequest request);
+ public delegate TelemetryPacketResponse AFNDelegate(TelemetryPacketRequest request);
///
/// 编码与方法的映射表
@@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
}
#region AFN_00H 确认∕否认
- public static byte[] AFN00_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN00_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -64,13 +64,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_01H 复位命令
- public static byte[] AFN01_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN01_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -89,13 +89,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_02H 链路接口检测
- public static byte[] AFN02_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN02_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -114,12 +114,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_04H 设置参数
- public static byte[] AFN04_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN04_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -138,13 +138,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_05H 控制命令
- public static byte[] AFN05_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN05_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -163,12 +163,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_09H 请求终端配置及信息
- public static byte[] AFN09_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN09_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -187,13 +187,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0AH 查询参数
- public static byte[] AFN0A_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN0A_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -212,12 +212,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0CH 请求一类数据
- public static byte[] AFN0C_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN0C_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -236,12 +236,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0DH 请求二类数据
- public static byte[] AFN0D_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN0D_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -260,12 +260,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return bytes;
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN10H 数据转发
- public static byte[] AFN10_Fn_Send(TelemetryPacketRequest request)
+ public static TelemetryPacketResponse AFN10_Fn_Send(TelemetryPacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -283,8 +283,8 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Pn = request.Pn,
Fn = request.Fn
};
- var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter,request.DataUnit);
- return bytes;
+ var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit);
+ return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
}
#region SpecialAmmeter 特殊电表转发
diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs
new file mode 100644
index 0000000..8cd964a
--- /dev/null
+++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs
@@ -0,0 +1,30 @@
+using JiShe.CollectBus.Common.Models;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.BuildSendDatas
+{
+ ///
+ /// 报文构建返回结果
+ ///
+ public class TelemetryPacketResponse
+ {
+ ///
+ /// 帧序列域SEQ
+ ///
+ public required Seq Seq { get; set; }
+
+ ///
+ /// 地址域A3的主站地址MSA
+ ///
+ public int MSA { get; set; }
+
+ ///
+ /// 报文体
+ ///
+ public required byte[] Data { get; set; }
+ }
+}
diff --git a/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs b/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs
index 471c897..c3f1e9a 100644
--- a/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs
+++ b/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs
@@ -11,25 +11,31 @@ namespace JiShe.CollectBus.Common.Consts
///
public class CommonConst
{
- ///
- /// 服务器标识
- ///
- public const string ServerTagName = "ServerTagName";
///
/// Kafka
///
public const string Kafka = "Kafka";
+ ///
+ /// 服务器标识
+ ///
+ public const string ServerTagName = $"{Kafka}:ServerTagName";
+
///
/// Kafka副本数量
///
- public const string KafkaReplicationFactor = "KafkaReplicationFactor";
+ public const string KafkaReplicationFactor = $"{Kafka}:KafkaReplicationFactor";
///
/// Kafka主题分区数量
///
- public const string NumPartitions = "NumPartitions";
+ public const string NumPartitions = $"{Kafka}:NumPartitions";
+
+ ///
+ /// 首次采集时间
+ ///
+ public const string FirstCollectionTime = "FirstCollectionTime";
}
}
diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs
index 4e3fef9..e6136df 100644
--- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs
+++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs
@@ -200,5 +200,38 @@ namespace JiShe.CollectBus.Common.Extensions
{
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
}
+
+ ///
+ /// 采集时间节点计算
+ ///
+ /// 待采集时间
+ ///
+ ///
+ public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval)
+ {
+ // 计算精确到分钟的基准时间
+ var baseTime = new DateTime(
+ referenceTime.Year,
+ referenceTime.Month,
+ referenceTime.Day,
+ referenceTime.Hour,
+ referenceTime.Minute,
+ 0);
+
+ // 计算总分钟数和下一个间隔点
+ int totalMinutes = baseTime.Hour * 60 + baseTime.Minute;
+ int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval;
+
+ // 处理跨天情况
+ int daysToAdd = nextTotalMinutes / (24 * 60);
+ int remainingMinutes = nextTotalMinutes % (24 * 60);
+ int hours = remainingMinutes / 60;
+ int minutes = remainingMinutes % 60;
+
+ return baseTime.Date
+ .AddDays(daysToAdd)
+ .AddHours(hours)
+ .AddMinutes(minutes);
+ }
}
}
diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
index 22ebef5..6871a9e 100644
--- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
+++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
@@ -669,7 +669,7 @@ namespace JiShe.CollectBus.Common.Helpers
return att == null ? field.Name : ((DescriptionAttribute)att).Description;
}
-
+
///
/// 将传入的字符串中间部分字符替换成特殊字符
@@ -759,7 +759,7 @@ namespace JiShe.CollectBus.Common.Helpers
}
return fontValue;
- }
+ }
///
/// 获取任务标识
@@ -767,10 +767,13 @@ namespace JiShe.CollectBus.Common.Helpers
///
///
///
+ ///
///
- public static string GetTaskMark(int afn,int fn,int pn)
+ public static decimal GetTaskMark(int afn, int fn, int pn, int msa)
{
- return $"{afn.ToString().PadLeft(2,'0')}{fn}{pn}";
+ var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}";
+
+ return Convert.ToInt32(makstr) << 32 | msa;
}
}
}
diff --git a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
index d397151..1edc46a 100644
--- a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
+++ b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
@@ -30,5 +30,15 @@ namespace JiShe.CollectBus.Common.Models
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
///
public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId;
+
+ ///
+ /// 是否已处理
+ ///
+ public virtual bool IsHandle { get; set; } = false;
+
+ ///
+ /// 集中器地址
+ ///
+ public string FocusAddress { get; set;}
}
}
diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
index aaadf3f..afe25da 100644
--- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
+++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
@@ -17,7 +17,7 @@
后端服务
-
+