dev #2
@ -226,6 +226,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
|
|
||||||
timer1.Stop();
|
timer1.Stop();
|
||||||
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
|
List<string> focusAddressDataLista = new List<string>();
|
||||||
|
foreach (var item in meterInfos)
|
||||||
|
{
|
||||||
|
focusAddressDataLista.Add(item.FocusAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
|
||||||
|
|
||||||
|
// 打印分布统计
|
||||||
|
DeviceGroupBalanceControl.PrintDistributionStats();
|
||||||
|
|
||||||
|
await Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -25,6 +25,7 @@ using System;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using static FreeSql.Internal.GlobalFilter;
|
using static FreeSql.Internal.GlobalFilter;
|
||||||
|
|
||||||
@ -182,7 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
deviceIdSelector: data => data.FocusAddress,
|
deviceIdSelector: data => data.FocusAddress,
|
||||||
processor: (data, groupIndex) =>
|
processor: (data, groupIndex) =>
|
||||||
{
|
{
|
||||||
_ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
|
AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -582,125 +583,103 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
int timeDensity = 15;
|
int timeDensity = 15;
|
||||||
var currentDateTime = DateTime.Now;
|
var currentDateTime = DateTime.Now;
|
||||||
|
|
||||||
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
|
// 自动计算最佳并发度
|
||||||
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
|
||||||
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
var options = new ParallelOptions
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
|
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
MaxDegreeOfParallelism = recommendedThreads,
|
||||||
return;
|
};
|
||||||
}
|
string taskBatch = "20250417155016";
|
||||||
|
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
|
||||||
foreach (var focusItem in meterTaskInfos)
|
|
||||||
{
|
{
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
|
||||||
|
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
|
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
|
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
|
|
||||||
|
List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>();
|
||||||
|
decimal? cursor = null;
|
||||||
|
string member = null;
|
||||||
|
bool hasNext;
|
||||||
|
do
|
||||||
{
|
{
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
|
||||||
{
|
redisCacheTelemetryPacketInfoHashKey,
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
pageSize: 1000,
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
lastScore: cursor,
|
||||||
TimeDensity = timeDensity.ToString(),
|
lastMember: member);
|
||||||
};
|
|
||||||
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
meterInfos.AddRange(page.Items);
|
||||||
|
cursor = page.HasNext ? page.NextScore : null;
|
||||||
|
member = page.HasNext ? page.NextMember : null;
|
||||||
|
hasNext = page.HasNext;
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||||
|
items: meterInfos,
|
||||||
|
deviceIdSelector: data => data.FocusAddress,
|
||||||
|
processor: (data, groupIndex) =>
|
||||||
|
{
|
||||||
|
_= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
} while (hasNext);
|
||||||
}
|
});
|
||||||
}
|
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
|
||||||
{
|
|
||||||
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
stopwatch.Stop();
|
|
||||||
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
//var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
|
//var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
|
//if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
||||||
|
//{
|
||||||
|
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
|
||||||
|
// return;
|
||||||
|
//}
|
||||||
|
|
||||||
|
////获取下发任务缓存数据
|
||||||
|
//Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
|
||||||
|
//if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
|
//{
|
||||||
|
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
|
// return;
|
||||||
|
//}
|
||||||
|
|
||||||
|
//List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
||||||
|
|
||||||
|
////将取出的缓存任务数据发送到Kafka消息队列中
|
||||||
|
//foreach (var focusItem in meterTaskInfos)
|
||||||
|
//{
|
||||||
|
// foreach (var ammerterItem in focusItem.Value)
|
||||||
|
// {
|
||||||
|
// var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
||||||
|
// {
|
||||||
|
// MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
||||||
|
// MessageId = ammerterItem.Value.IssuedMessageId,
|
||||||
|
// FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
|
// TimeDensity = timeDensity.ToString(),
|
||||||
|
// };
|
||||||
|
// //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
|
// _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
|
// //_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
|
// meterTaskInfosList.Add(ammerterItem.Value);
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
//if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
|
//{
|
||||||
|
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
|
||||||
|
//}
|
||||||
|
|
||||||
|
|
||||||
|
//stopwatch.Stop();
|
||||||
|
|
||||||
|
//_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 电表采集任务指令创建
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
|
|
||||||
/// <param name="focusGroup">集中器数据分组</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
|
||||||
{
|
|
||||||
if (timeDensity <= 0)
|
|
||||||
{
|
|
||||||
timeDensity = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (timeDensity > 15)
|
|
||||||
{
|
|
||||||
timeDensity = 15;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (focusGroup == null || focusGroup.Count <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try
|
|
||||||
{
|
|
||||||
//将采集器编号的hash值取模分组
|
|
||||||
const int TotalShards = 1024;
|
|
||||||
var focusHashGroups = new Dictionary<int, Dictionary<string, Dictionary<string, AmmeterInfo>>>();
|
|
||||||
|
|
||||||
foreach (var (collectorId, ammetersDictionary) in focusGroup)
|
|
||||||
{
|
|
||||||
if (string.IsNullOrWhiteSpace(collectorId))
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败,无效Key -102");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 计算哈希分组ID
|
|
||||||
int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards);
|
|
||||||
|
|
||||||
// 获取或创建分组(避免重复查找)
|
|
||||||
if (!focusHashGroups.TryGetValue(hashGroupId, out var group))
|
|
||||||
{
|
|
||||||
group = new Dictionary<string, Dictionary<string, AmmeterInfo>>();
|
|
||||||
focusHashGroups[hashGroupId] = group;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 将当前集中器数据加入分组
|
|
||||||
group[collectorId] = ammetersDictionary;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (focusHashGroups == null)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//根据分组创建线程批处理集中器
|
|
||||||
foreach (var group in focusHashGroups)
|
|
||||||
{
|
|
||||||
await AmmerterCreatePublishTask2(timeDensity, group.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception)
|
|
||||||
{
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -711,7 +690,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="groupIndex">集中器所在分组</param>
|
/// <param name="groupIndex">集中器所在分组</param>
|
||||||
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task AmmerterCreatePublishTask(int timeDensity
|
private void AmmerterCreatePublishTask(int timeDensity
|
||||||
, AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
|
, AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
|
||||||
{
|
{
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
@ -719,7 +698,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
var currentTime = DateTime.Now;
|
var currentTime = DateTime.Now;
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
||||||
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
|
||||||
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
@ -933,15 +912,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="topicName">主题名称</param>
|
/// <param name="topicName">主题名称</param>
|
||||||
/// <param name="taskRecord">任务记录</param>
|
/// <param name="taskRecord">任务记录</param>
|
||||||
|
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task KafkaProducerIssuedMessage(string topicName,
|
private async Task KafkaProducerIssuedMessage(string topicName,
|
||||||
MeterReadingRecords taskRecord)
|
MeterReadingTelemetryPacketInfo taskRecord,int partition)
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
||||||
{
|
{
|
||||||
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
|
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
|
||||||
}
|
}
|
||||||
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(topicName, partition, taskRecord);
|
await _producerService.ProduceAsync(topicName, partition, taskRecord);
|
||||||
}
|
}
|
||||||
@ -998,191 +977,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 电表创建发布任务
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="timeDensity">采集频率</param>
|
|
||||||
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task AmmerterCreatePublishTask2(int timeDensity
|
|
||||||
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
|
||||||
{
|
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
|
||||||
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
|
||||||
|
|
||||||
var currentTime = DateTime.Now;
|
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
|
||||||
foreach (var focusInfo in focusGroup)
|
|
||||||
{
|
|
||||||
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
|
||||||
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
|
|
||||||
|
|
||||||
foreach (var ammeterInfo in focusInfo.Value)
|
|
||||||
{
|
|
||||||
var ammeter = ammeterInfo.Value;
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,采集项为空,-101");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
//载波的不处理
|
|
||||||
if (ammeter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,载波不处理,-102");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ammeter.State.Equals(2))
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeter.Name} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}状态为禁用,不处理");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
|
|
||||||
//if (!IsGennerateCmd(ammeter.LastTime, -1))
|
|
||||||
//{
|
|
||||||
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令");
|
|
||||||
// continue;
|
|
||||||
//}
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (string.IsNullOrWhiteSpace(ammeter.Address))
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (Convert.ToInt32(ammeter.Address) > 65535)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<string> tempCodes = ammeter.ItemCodes.Deserialize<List<string>>()!;
|
|
||||||
|
|
||||||
//TODO:自动上报数据只主动采集1类数据。
|
|
||||||
if (ammeter.AutomaticReport.Equals(1))
|
|
||||||
{
|
|
||||||
var tempSubCodes = new List<string>();
|
|
||||||
if (tempCodes.Contains("0C_49"))
|
|
||||||
{
|
|
||||||
tempSubCodes.Add("0C_49");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tempSubCodes.Contains("0C_149"))
|
|
||||||
{
|
|
||||||
tempSubCodes.Add("0C_149");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ammeter.ItemCodes.Contains("10_97"))
|
|
||||||
{
|
|
||||||
tempSubCodes.Add("10_97");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tempSubCodes == null || tempSubCodes.Count <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}自动上报数据主动采集1类数据时数据类型为空");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
tempCodes = tempSubCodes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
|
|
||||||
|
|
||||||
foreach (var tempItem in tempCodes)
|
|
||||||
{
|
|
||||||
//排除已发送日冻结和月冻结采集项配置
|
|
||||||
if (DayFreezeCodes.Contains(tempItem))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (MonthFreezeCodes.Contains(tempItem))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var itemCodeArr = tempItem.Split('_');
|
|
||||||
var aFNStr = itemCodeArr[0];
|
|
||||||
var aFN = (AFN)aFNStr.HexToDec();
|
|
||||||
var fn = int.Parse(itemCodeArr[1]);
|
|
||||||
byte[] dataInfos = null;
|
|
||||||
if (ammeter.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据)
|
|
||||||
{
|
|
||||||
//实时数据
|
|
||||||
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeter.FocusAddress, ammeter.MeteringCode, (ATypeOfDataItems)fn);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
string methonCode = $"AFN{aFNStr}_Fn_Send";
|
|
||||||
//特殊表暂不处理
|
|
||||||
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
|
|
||||||
, out var handler))
|
|
||||||
{
|
|
||||||
dataInfos = handler(new TelemetryPacketRequest()
|
|
||||||
{
|
|
||||||
FocusAddress = ammeter.FocusAddress,
|
|
||||||
Fn = fn,
|
|
||||||
Pn = ammeter.MeteringCode
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}无效编码。");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//TODO:特殊表
|
|
||||||
|
|
||||||
if (dataInfos == null || dataInfos.Length <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}未能正确获取报文。");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
var meterReadingRecords = new MeterReadingRecords()
|
|
||||||
{
|
|
||||||
ProjectID = ammeter.ProjectID,
|
|
||||||
DatabaseBusiID = ammeter.DatabaseBusiID,
|
|
||||||
PendingCopyReadTime = pendingCopyReadTime,
|
|
||||||
CreationTime = currentTime,
|
|
||||||
MeterAddress = ammeter.AmmerterAddress,
|
|
||||||
MeterId = ammeter.MeterId,
|
|
||||||
MeterType = MeterTypeEnum.Ammeter,
|
|
||||||
FocusAddress = ammeter.FocusAddress,
|
|
||||||
FocusID = ammeter.FocusId,
|
|
||||||
AFN = aFN,
|
|
||||||
Fn = fn,
|
|
||||||
ItemCode = tempItem,
|
|
||||||
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode),
|
|
||||||
ManualOrNot = false,
|
|
||||||
Pn = ammeter.MeteringCode,
|
|
||||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
|
||||||
IssuedMessageHexString = Convert.ToHexString(dataInfos),
|
|
||||||
};
|
|
||||||
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
|
|
||||||
|
|
||||||
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
|
|
||||||
}
|
|
||||||
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -230,7 +230,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 自动计算最优线程数
|
/// 自动计算最优线程数
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private static int CalculateOptimalThreadCount()
|
public static int CalculateOptimalThreadCount()
|
||||||
{
|
{
|
||||||
int coreCount = Environment.ProcessorCount;
|
int coreCount = Environment.ProcessorCount;
|
||||||
return Math.Min(
|
return Math.Min(
|
||||||
@ -418,6 +418,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
|||||||
{
|
{
|
||||||
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
|
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"总共: {stats.Sum(d=>d.Count)} 条数据");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user