Compare commits

..

No commits in common. "fd9e3e5af3036f2df50b8c0fbe59c6acd300b8fa" and "318dd2dcbefa3192530998ad3184fc39914686bf" have entirely different histories.

4 changed files with 58 additions and 93 deletions

View File

@ -72,75 +72,51 @@ namespace JiShe.CollectBus.DataChannels
/// <returns></returns> /// <returns></returns>
public async Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader) public async Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader)
{ {
try var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
{ var batchSize = 10000;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>(); var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
var batchSize = 200_00;
var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
var timer = Stopwatch.StartNew();
long timeoutMilliseconds = 0;
List<MeterReadingTelemetryPacketInfo> taskInfoList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskInfoList = new List<MeterReadingTelemetryPacketInfo>();
while (true) var startTime = DateTime.Now;
var timer = new Stopwatch();
while (true)
{
var canRead = await _telemetryPacketInfoReader.WaitToReadAsync();
if (!canRead)
{ {
var canRead = _telemetryPacketInfoReader.Count; continue;
if (canRead <= 0)
{
if (timeoutMilliseconds > 0)
{
_logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
}
timeoutMilliseconds = 0;
//无消息时短等待1秒
await Task.Delay(100_0);
continue;
}
timer.Restart();
var startTime = DateTime.Now;
while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
{
try
{
if (_telemetryPacketInfoReader.TryRead(out var dataItem))
{
taskInfoList.AddRange(dataItem.Item2);
}
}
catch (Exception ee)
{
throw;
}
}
if (taskInfoList != null && taskInfoList.Count > 0)
{
await _dbProvider.BatchInsertAsync(metadata, taskInfoList);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskInfoList.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
// _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex);
}
);
taskInfoList.Clear();
}
timer.Stop();
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
startTime = DateTime.Now;
} }
}
catch (Exception ex)
{
throw; while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
{
if (_telemetryPacketInfoReader.TryRead(out var dataItem))
{
taskInfoList.AddRange(dataItem.Item2);
}
else
{
//无消息时短暂等待
await Task.Delay(5);
}
}
if (taskInfoList != null && taskInfoList.Count > 0)
{
await _dbProvider.BatchInsertAsync(metadata, taskInfoList);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskInfoList.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
// _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex);
}
);
taskInfoList.Clear();
}
startTime = DateTime.Now;
} }
} }

View File

@ -32,7 +32,6 @@ using static IdentityModel.ClaimComparer;
using JiShe.CollectBus.DataChannels; using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options; using JiShe.CollectBus.DataMigration.Options;
using static System.Runtime.InteropServices.JavaScript.JSType; using static System.Runtime.InteropServices.JavaScript.JSType;
using static System.Formats.Asn1.AsnWriter;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -48,7 +47,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IProtocolService _protocolService; private readonly IProtocolService _protocolService;
private readonly DataMigrationOptions _dataMigrationOptions; private readonly DataMigrationOptions _dataMigrationOptions;
private readonly KafkaOptionConfig _kafkaOptions; private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions; private readonly ServerApplicationOptions _applicationOptions;
int pageSize = 10000; int pageSize = 10000;
@ -70,7 +69,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_dataMigrationOptions = dataMigrationOptions.Value; _dataMigrationOptions = dataMigrationOptions.Value;
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value; _applicationOptions = applicationOptions.Value;
} }
@ -117,7 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return; return;
} }
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
//定时抄读 //定时抄读
foreach (var item in taskInfos) foreach (var item in taskInfos)
@ -170,7 +169,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps); var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0) if (tempTask == null || tempTask.Count <= 0)
{ {
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
@ -258,7 +257,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0) if (tempTask == null || tempTask.Count <= 0)
{ {
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
@ -299,14 +298,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//电表定时阀控任务处理。 //电表定时阀控任务处理。
var autoValveControlTask = await AmmeterScheduledAutoValveControl(); var autoValveControlTask = await AmmeterScheduledAutoValveControl();
if (autoValveControlTask == null || autoValveControlTask.Count <= 0) if (autoValveControlTask == null || autoValveControlTask.Count <= 0)
{ {
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务"); _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
} }
#region #region
@ -333,10 +332,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//此处代码不要删除 //此处代码不要删除
#if DEBUG #if DEBUG
var timeDensity = "15"; var timeDensity = "15";
var serverTagName = "JiSheCollectBus2"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
List<string> focusAddressDataLista = new List<string>(); List<string> focusAddressDataLista = new List<string>();
@ -1501,8 +1499,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
List<T> meterInfos = new List<T>(); List<T> meterInfos = new List<T>();
decimal? cursor = null; decimal? cursor = null;
string member = null; string member = null;
bool hasNext;
while (true) do
{ {
var page = await _redisDataCacheService.GetAllPagedData<T>( var page = await _redisDataCacheService.GetAllPagedData<T>(
redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoHashKeyTemp,
@ -1512,14 +1510,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
lastMember: member); lastMember: member);
meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
if (!page.HasNext) cursor = page.HasNext ? page.NextScore : null;
{ member = page.HasNext ? page.NextMember : null;
break; hasNext = page.HasNext;
} } while (hasNext);
cursor = page.NextScore;
member = page.NextMember;
}
//var page = await _redisDataCacheService.GetAllPagedData<T>( //var page = await _redisDataCacheService.GetAllPagedData<T>(
@ -1536,12 +1530,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return; return;
} }
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒"); _logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
timer.Restart();
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: meterInfos,
deviceIdSelector: data => data.MeterId.ToString(), deviceIdSelector: data => data.MeterId.ToString(),

View File

@ -166,7 +166,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' "; WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime";
if (!string.IsNullOrWhiteSpace(currentTime)) if (!string.IsNullOrWhiteSpace(currentTime))
{ {

View File

@ -204,7 +204,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
int actualThreads = maxConcurrency ?? recommendedThreads; int actualThreads = maxConcurrency ?? recommendedThreads;
// 创建节流器 // 创建节流器
using var throttler = new SemaphoreSlim(initialCount: actualThreads); using var throttler = new SemaphoreSlim(initialCount: actualThreads);