@ -1,4 +1,5 @@
using JiShe.CollectBus.Ammeters ;
using DnsClient.Protocol ;
using JiShe.CollectBus.Ammeters ;
using JiShe.CollectBus.Application.Contracts ;
using JiShe.CollectBus.Common.BuildSendDatas ;
using JiShe.CollectBus.Common.Consts ;
@ -27,6 +28,7 @@ using System.Linq;
using System.Threading ;
using System.Threading.Tasks ;
using static FreeSql . Internal . GlobalFilter ;
using static System . Runtime . InteropServices . JavaScript . JSType ;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -101,6 +103,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return ;
}
var currentTime = DateTime . Now ;
foreach ( var item in taskInfos )
{
var tasksToBeIssueModel = await FreeRedisProvider . Instance . GetAsync < TasksToBeIssueModel > ( item ) ;
@ -130,70 +134,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if ( meteryType = = MeterTypeEnum . Ammeter . ToString ( ) )
{
var timer = Stopwatch . StartNew ( ) ;
//_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}") ;
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
List < AmmeterInfo > meterInfos = new List < AmmeterInfo > ( ) ;
decimal? cursor = null ;
string member = null ;
bool hasNext ;
do
_ = CreateMeterPublishTask < AmmeterInfo > (
timeDensity : timeDensity ,
taskBatch : $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}" ,
meterType : MeterTypeEnum . Ammeter ,
taskCreateAction : ( timeDensity , data , groupIndex , taskBatch ) = >
{
var page = await _redisDataCacheService . GetAllPagedData < AmmeterInfo > (
redisCacheMeterInfoHashKeyTemp ,
redisCacheMeterInfoZSetScoresIndexKeyTemp ,
pageSize : 1000 ,
lastScore : cursor ,
lastMember : member ) ;
meterInfos . AddRange ( page . Items ) ;
cursor = page . HasNext ? page . NextScore : null ;
member = page . HasNext ? page . NextMember : null ;
hasNext = page . HasNext ;
} while ( hasNext ) ;
if ( meterInfos = = null | | meterInfos . Count < = 0 )
{
timer . Stop ( ) ;
_logger . LogError ( $"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105" ) ;
return ;
}
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
//处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
// items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, groupIndex) =>
// {
// _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
// }
//);
await DeviceGroupBalanceControl . ProcessWithThrottleAsync (
items : meterInfos ,
deviceIdSelector : data = > data . FocusAddress ,
processor : ( data , groupIndex ) = >
{
AmmerterCreatePublishTask ( timeDensity , data , groupIndex , tasksToBeIssueModel . NextTaskTime . ToString ( "yyyyMMddHHmmss" ) ) ;
}
) ;
timer . Stop ( ) ;
_logger . LogInformation ( $"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息" ) ;
AmmerterCreatePublishTaskAction ( timeDensity , data , groupIndex , taskBatch ) ;
} ) ;
}
else if ( meteryType = = MeterTypeEnum . WaterMeter . ToString ( ) )
{
//todo 水表任务创建待处理
//await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos);
_ = CreateMeterPublishTask < WatermeterInfo > (
timeDensity : timeDensity ,
taskBatch : $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}" ,
meterType : MeterTypeEnum . Ammeter ,
taskCreateAction : ( timeDensity , data , groupIndex , taskBatch ) = >
{
//AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
} ) ;
}
else
{
@ -205,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel . NextTaskTime = tasksToBeIssueModel . NextTaskTime . AddMinutes ( timeDensity ) ;
tasksToBeIssueModel . NextTaskTime = tasksToBeIssueModel . NextTaskTime . CalculateNextCollectionTime ( timeDensity ) ;
await FreeRedisProvider . Instance . SetAsync ( item , tasksToBeIssueModel ) ;
}
}
@ -230,23 +195,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData ( string gatherCode = "" )
{
#if DEBUG
//var timeDensity = "15";
//string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
////获取缓存中的电表信息
//var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
//var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
////List<string> focusAddressDataLista = new List<string>();
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
//foreach (var item in tempMeterInfos)
//{
// var tempData = item.Adapt<AmmeterInfo>();
// tempData.FocusId = item.FocusID;
// tempData.MeterId = item.Id;
// meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress);
//}
return ;
@ -258,23 +207,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
List < AmmeterInfo > meterInfos = new List < AmmeterInfo > ( ) ;
List < string > focusAddressDataLista = new List < string > ( ) ;
var timer1 = Stopwatch . StartNew ( ) ;
//decimal? cursor = null;
//string member = null;
//bool hasNext;
//do
//{
// var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
var allIds = new HashSet < string > ( ) ;
decimal? score = null ;
@ -329,6 +261,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos . GroupBy ( d = > d . TimeDensity ) ;
//先处理采集频率任务缓存
foreach ( var item in meterInfoGroupByTimeDensity )
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel ( )
{
TimeDensity = item . Key ,
NextTaskTime = _kafkaOptions . FirstCollectionTime . CalculateNextCollectionTime ( item . Key ) , //使用首次采集时间作为下一次采集时间
} ;
//todo 首次采集时间节点到目前运行时间中漏采的时间点, 可以考虑使用IoTDB的存储, 利用时间序列处理。
var taskRedisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . Ammeter , item . Key ) ;
await FreeRedisProvider . Instance . SetAsync ( taskRedisCacheKey , nextTask ) ;
}
foreach ( var itemTimeDensity in meterInfoGroupByTimeDensity )
{
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}" ;
@ -403,25 +351,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
ammeterInfos . Add ( ammeter ) ;
//keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
}
//await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
await _redisDataCacheService . BatchInsertDataAsync < AmmeterInfo > (
redisCacheMeterInfoHashKey ,
redisCacheMeterInfoSetIndexKey ,
redisCacheMeterInfoZSetScoresIndexKey , ammeterInfos ) ;
//在缓存表信息数据的时候, 新增下一个时间的自动处理任务, 1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel ( )
{
TimeDensity = itemTimeDensity . Key ,
NextTaskTime = DateTime . Now . AddMinutes ( 1 )
} ;
var taskRedisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . Ammeter , itemTimeDensity . Key ) ;
await FreeRedisProvider . Instance . SetAsync ( taskRedisCacheKey , nextTask ) ;
}
//初始化设备组负载控制
@ -446,63 +382,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading ( )
{
//获取缓存中的电表信息
int timeDensity = 1 ;
int timeDensity = 5 ;
var currentTime = DateTime . Now ;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix ( timeDensity , MeterTypeEnum . Ammeter ) ;
var oneMinutekeyList = await FreeRedisProvider . Instance . KeysAsync ( redisKeyList ) ;
if ( oneMinutekeyList = = null | | oneMinutekeyList . Length < = 0 )
{
_logger . LogError ( $"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101" ) ;
return ;
}
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl . CalculateOptimalThreadCount ( ) ;
//获取下发任务缓存数据
Dictionary < string , Dictionary < string , MeterReadingRecords > > meterTaskInfos = await GetMeterRedisCacheDictionaryData < MeterReadingRecords > ( oneMinutekeyList , SystemType , ServerTagName , timeDensity . ToString ( ) , MeterTypeEnum . Ammeter ) ;
if ( meterTaskInfos = = null | | meterTaskInfos . Count < = 0 )
var options = new ParallelOptions
{
_logger . LogError ( $"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102" ) ;
return ;
}
List < MeterReadingRecords > meterTaskInfosList = new List < MeterReadingRecords > ( ) ;
//将取出的缓存任务数据发送到Kafka消息队列中
foreach ( var focusItem in meterTaskInfos )
{
foreach ( var ammerterItem in focusItem . Value )
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage ( )
{
MessageHexString = ammerterItem . Value . IssuedMessageHexString ,
MessageId = ammerterItem . Value . IssuedMessageId ,
FocusAddress = ammerterItem . Value . FocusAddress ,
TimeDensity = timeDensity . ToString ( ) ,
MaxDegreeOfParallelism = recommendedThreads ,
} ;
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg) ;
var taskBatch = $"{currentTime:yyyyMMddHHmm00}" ;
_ = _producerService . ProduceAsync ( ProtocolConst . AmmeterSubscriberWorkerOneMinuteIssuedEventName , tempMsg ) ;
//_= _producerBus.Publish(tempMsg);
meterTaskInfosList . Add ( ammerterItem . Value ) ;
}
}
if ( meterTaskInfosList ! = null & & meterTaskInfosList . Count > 0 )
Parallel . For ( 0 , _kafkaOptions . NumPartitions , options , async groupIndex = >
{
//_dbProvider.SwitchSessionPool(true) ;
//await _dbProvider.InsertAsync(meterTaskInfosList) ;
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
await _meterReadingRecordRepository . InsertManyAsync ( meterTaskInfosList , currentTime ) ;
}
_ = CreateMeterKafkaTaskMessage ( redisCacheTelemetryPacketInfoHashKey , redisCacheTelemetryPacketInfoZSetScoresIndexKey ) ;
} ) ;
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
_logger . LogInformation ( $"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成" ) ;
await Task . CompletedTask ;
}
@ -516,57 +418,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 5 ;
var currentTime = DateTime . Now ;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix ( timeDensity , MeterTypeEnum . Ammeter ) ;
var fiveMinutekeyList = await FreeRedisProvider . Instance . KeysAsync ( redisKeyList ) ;
if ( fiveMinutekeyList = = null | | fiveMinutekeyList . Length < = 0 )
{
_logger . LogError ( $"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101" ) ;
return ;
}
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl . CalculateOptimalThreadCount ( ) ;
//获取下发任务缓存数据
Dictionary < string , Dictionary < string , MeterReadingRecords > > meterTaskInfos = await GetMeterRedisCacheDictionaryData < MeterReadingRecords > ( fiveMinutekeyList , SystemType , ServerTagName , timeDensity . ToString ( ) , MeterTypeEnum . Ammeter ) ;
if ( meterTaskInfos = = null | | meterTaskInfos . Count < = 0 )
var options = new ParallelOptions
{
_logger . LogError ( $"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102" ) ;
return ;
}
List < MeterReadingRecords > meterTaskInfosList = new List < MeterReadingRecords > ( ) ;
//将取出的缓存任务数据发送到Kafka消息队列中
foreach ( var focusItem in meterTaskInfos )
{
foreach ( var ammerterItem in focusItem . Value )
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage ( )
{
MessageHexString = ammerterItem . Value . IssuedMessageHexString ,
MessageId = ammerterItem . Value . IssuedMessageId ,
FocusAddress = ammerterItem . Value . FocusAddress ,
TimeDensity = timeDensity . ToString ( ) ,
MaxDegreeOfParallelism = recommendedThreads ,
} ;
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg) ;
var taskBatch = $"{currentTime:yyyyMMddHHmm00}" ;
_ = _producerService . ProduceAsync ( ProtocolConst . AmmeterSubscriberWorkerFiveMinuteIssuedEventName , tempMsg ) ;
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList . Add ( ammerterItem . Value ) ;
}
}
if ( meterTaskInfosList ! = null & & meterTaskInfosList . Count > 0 )
Parallel . For ( 0 , _kafkaOptions . NumPartitions , options , async groupIndex = >
{
await _meterReadingRecordRepository . InsertManyAsync ( meterTaskInfosList , currentTime ) ;
}
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
_logger . LogInformation ( $"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成" ) ;
_ = CreateMeterKafkaTaskMessage ( redisCacheTelemetryPacketInfoHashKey , redisCacheTelemetryPacketInfoZSetScoresIndexKey ) ;
} ) ;
}
/// <summary>
@ -575,11 +442,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading ( )
{
var stopwatch = Stopwatch . StartNew ( ) ;
//获取缓存中的电表信息
int timeDensity = 15 ;
var current Date Time = DateTime . Now ;
var current Time = DateTime . Now ;
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl . CalculateOptimalThreadCount ( ) ;
@ -588,33 +453,84 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
MaxDegreeOfParallelism = recommendedThreads ,
} ;
string taskBatch = "20250417171649" ;
var taskBatch = $"{currentTime:yyyyMMddHHmm00}" ;
Parallel . For ( 0 , _kafkaOptions . NumPartitions , options , async groupIndex = >
{
Console . WriteLine ( $"15分钟采集电表数据:{groupIndex}" ) ;
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
_ = GetTaskInfoListToKafka ( redisCacheTelemetryPacketInfoHashKey , redisCacheTelemetryPacketInfoZSetScoresIndexKey ) ;
_ = CreateMeterKafkaTaskMessage ( redisCacheTelemetryPacketInfoHashKey , redisCacheTelemetryPacketInfoZSetScoresIndexKey ) ;
} ) ;
stopwatch . Stop ( ) ;
_logger . LogError ( $"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。" ) ;
await Task . CompletedTask ;
}
/// <summary>
/// 电表创建发布任务
/// 创建电表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask ( int timeDensity , string taskBatch )
{
var timer = Stopwatch . StartNew ( ) ;
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
List < AmmeterInfo > meterInfos = new List < AmmeterInfo > ( ) ;
decimal? cursor = null ;
string member = null ;
bool hasNext ;
do
{
var page = await _redisDataCacheService . GetAllPagedData < AmmeterInfo > (
redisCacheMeterInfoHashKeyTemp ,
redisCacheMeterInfoZSetScoresIndexKeyTemp ,
pageSize : 1000 ,
lastScore : cursor ,
lastMember : member ) ;
meterInfos . AddRange ( page . Items ) ;
cursor = page . HasNext ? page . NextScore : null ;
member = page . HasNext ? page . NextMember : null ;
hasNext = page . HasNext ;
} while ( hasNext ) ;
if ( meterInfos = = null | | meterInfos . Count < = 0 )
{
timer . Stop ( ) ;
_logger . LogError ( $"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105" ) ;
return ;
}
await DeviceGroupBalanceControl . ProcessWithThrottleAsync (
items : meterInfos ,
deviceIdSelector : data = > data . FocusAddress ,
processor : ( data , groupIndex ) = >
{
AmmerterCreatePublishTaskAction ( timeDensity , data , groupIndex , taskBatch ) ;
}
) ;
timer . Stop ( ) ;
_logger . LogInformation ( $"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息" ) ;
}
/// <summary>
/// 创建电表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns>
private void AmmerterCreatePublishTask ( int timeDensity
private void AmmerterCreatePublishTask Action ( int timeDensity
, AmmeterInfo ammeterInfo , int groupIndex , string taskBatch )
{
var handlerPacketBuilder = TelemetryPacketBuilder . AFNHandlersDictionary ;
@ -800,7 +716,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
| | string . IsNullOrWhiteSpace ( redisCacheTelemetryPacketInfoSetIndexKey )
| | string . IsNullOrWhiteSpace ( redisCacheTelemetryPacketInfoZSetScoresIndexKey ) )
{
_logger . LogError ( $"{nameof(AmmerterCreatePublishTask )} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}: {redisCacheTelemetryPacketInfoSetIndexKey}: {redisCacheTelemetryPacketInfoZSetScoresIndexKey}, -101") ;
_logger . LogError ( $"{nameof(AmmerterCreatePublishTask Action )} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}: {redisCacheTelemetryPacketInfoSetIndexKey}: {redisCacheTelemetryPacketInfoZSetScoresIndexKey}, -101") ;
return ;
}
@ -827,56 +743,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// taskList);
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessage ( string topicName ,
MeterReadingTelemetryPacketInfo taskRecord , int partition )
{
if ( string . IsNullOrWhiteSpace ( topicName ) | | taskRecord = = null )
{
throw new Exception ( $"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101" ) ;
}
await _producerService . ProduceAsync ( topicName , partition , taskRecord ) ;
}
private async Task GetTaskInfoListToKafka (
string redisCacheTelemetryPacketInfoHashKey ,
string redisCacheTelemetryPacketInfoZSetScoresIndexKey )
{
decimal? cursor = null ;
string member = null ;
bool hasNext ;
do
{
var page = await _redisDataCacheService . GetAllPagedData < MeterReadingTelemetryPacketInfo > (
redisCacheTelemetryPacketInfoHashKey ,
redisCacheTelemetryPacketInfoZSetScoresIndexKey ,
pageSize : 1000 ,
lastScore : cursor ,
lastMember : member ) ;
cursor = page . HasNext ? page . NextScore : null ;
member = page . HasNext ? page . NextMember : null ;
hasNext = page . HasNext ;
await DeviceGroupBalanceControl . ProcessWithThrottleAsync (
items : page . Items ,
deviceIdSelector : data = > data . FocusAddress ,
processor : ( data , groupIndex ) = >
{
_ = KafkaProducerIssuedMessage ( ProtocolConst . AmmeterSubscriberWorkerFifteenMinuteIssuedEventName , data , groupIndex ) ;
}
) ;
} while ( hasNext ) ;
}
#endregion
@ -945,6 +811,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . WaterMeter , itemTimeDensity . Key ) ;
await FreeRedisProvider . Instance . SetAsync ( taskRedisCacheKey , nextTask ) ;
}
_logger . LogInformation ( $"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成" ) ;
}
@ -1010,12 +878,58 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger . LogInformation ( $"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成" ) ;
}
/// <summary>
/// 创建水表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="meterInfo">水表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns>
private void WatermeterCreatePublishTaskAction ( int timeDensity
, WatermeterInfo meterInfo , int groupIndex , string taskBatch )
{
var handlerPacketBuilder = TelemetryPacketBuilder . AFNHandlersDictionary ;
var currentTime = DateTime . Now ;
var pendingCopyReadTime = currentTime . AddMinutes ( timeDensity ) ;
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}" ;
var taskInfo = new MeterReadingTelemetryPacketInfo ( )
{
Seq = null ,
} ;
//
Build188SendData . Build188WaterMeterReadingSendDataUnit ( meterInfo . Address ) ;
using ( var pipe = FreeRedisProvider . Instance . StartPipe ( ) )
{
// 主数据存储Hash
pipe . HSet ( redisCacheTelemetryPacketInfoHashKey , taskInfo . MemberId , taskInfo . Serialize ( ) ) ;
// Set索引缓存
pipe . SAdd ( redisCacheTelemetryPacketInfoSetIndexKey , taskInfo . MemberId ) ;
// ZSET索引缓存Key
pipe . ZAdd ( redisCacheTelemetryPacketInfoZSetScoresIndexKey , taskInfo . ScoreValue , taskInfo . MemberId ) ;
pipe . EndPipe ( ) ;
}
}
#endregion
#region 公 共 处 理 方 法
/// <summary>
/// 判断是否需要生成采集指令
/// </summary>
@ -1032,39 +946,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return false ;
}
///// <summary>
///// 指定时间对比当前时间
///// </summary>
///// <param name="lastTime"></param>
///// <param name="subtrahend"></param>
///// <returns></returns>
//private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
//{
// if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
// return false;
// return true;
//}
///// <summary>
///// 缓存下一个时间的任务
///// </summary>
///// <param name="timeDensity">采集频率</param>
///// <param name="meterType">表类型</param>
///// <returns></returns>
//private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType)
//{
// //缓存下一个时间的任务
// TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
// {
// TimeDensity = timeDensity,
// NextTask = DateTime.Now.AddMinutes(timeDensity)
// };
// var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity);
// await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
//}
/// <summary>
/// 获取缓存表计下发指令缓存key前缀
/// </summary>
@ -1076,6 +957,130 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*" ;
}
/// <summary>
/// 创建表的待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns>
private async Task CreateMeterPublishTask < T > ( int timeDensity , string taskBatch , MeterTypeEnum meterType , Action < int , T , int , string > taskCreateAction ) where T : DeviceCacheBasicModel
{
var timer = Stopwatch . StartNew ( ) ;
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}" ;
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}" ;
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}" ;
List < T > meterInfos = new List < T > ( ) ;
decimal? cursor = null ;
string member = null ;
bool hasNext ;
do
{
var page = await _redisDataCacheService . GetAllPagedData < T > (
redisCacheMeterInfoHashKeyTemp ,
redisCacheMeterInfoZSetScoresIndexKeyTemp ,
pageSize : 1000 ,
lastScore : cursor ,
lastMember : member ) ;
meterInfos . AddRange ( page . Items ) ;
cursor = page . HasNext ? page . NextScore : null ;
member = page . HasNext ? page . NextMember : null ;
hasNext = page . HasNext ;
} while ( hasNext ) ;
if ( meterInfos = = null | | meterInfos . Count < = 0 )
{
timer . Stop ( ) ;
_logger . LogError ( $"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105" ) ;
return ;
}
await DeviceGroupBalanceControl . ProcessWithThrottleAsync (
items : meterInfos ,
deviceIdSelector : data = > data . FocusAddress ,
processor : ( data , groupIndex ) = >
{
taskCreateAction ( timeDensity , data , groupIndex , taskBatch ) ;
}
) ;
timer . Stop ( ) ;
_logger . LogInformation ( $"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息" ) ;
}
/// <summary>
/// 创建Kafka消息
/// </summary>
/// <param name="redisCacheTelemetryPacketInfoHashKey"></param>
/// <param name="redisCacheTelemetryPacketInfoZSetScoresIndexKey"></param>
/// <returns></returns>
private async Task CreateMeterKafkaTaskMessage (
string redisCacheTelemetryPacketInfoHashKey ,
string redisCacheTelemetryPacketInfoZSetScoresIndexKey )
{
if ( string . IsNullOrWhiteSpace ( redisCacheTelemetryPacketInfoHashKey ) | | string . IsNullOrWhiteSpace ( redisCacheTelemetryPacketInfoHashKey ) )
{
throw new Exception ( $"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败, 参数异常,-101" ) ;
}
decimal? cursor = null ;
string member = null ;
bool hasNext ;
var stopwatch = Stopwatch . StartNew ( ) ;
do
{
var page = await _redisDataCacheService . GetAllPagedData < MeterReadingTelemetryPacketInfo > (
redisCacheTelemetryPacketInfoHashKey ,
redisCacheTelemetryPacketInfoZSetScoresIndexKey ,
pageSize : 1000 ,
lastScore : cursor ,
lastMember : member ) ;
cursor = page . HasNext ? page . NextScore : null ;
member = page . HasNext ? page . NextMember : null ;
hasNext = page . HasNext ;
await DeviceGroupBalanceControl . ProcessWithThrottleAsync (
items : page . Items ,
deviceIdSelector : data = > data . FocusAddress ,
processor : ( data , groupIndex ) = >
{
_ = KafkaProducerIssuedMessageAction ( ProtocolConst . AmmeterSubscriberWorkerFifteenMinuteIssuedEventName , data , groupIndex ) ;
}
) ;
} while ( hasNext ) ;
stopwatch . Stop ( ) ;
_logger . LogError ( $"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。" ) ;
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessageAction ( string topicName ,
MeterReadingTelemetryPacketInfo taskRecord , int partition )
{
if ( string . IsNullOrWhiteSpace ( topicName ) | | taskRecord = = null )
{
throw new Exception ( $"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101" ) ;
}
await _producerService . ProduceAsync ( topicName , partition , taskRecord ) ;
}
#endregion
}