实现IoTDB平均每分钟写入120万数据。
This commit is contained in:
parent
5ba1325204
commit
ede7c8f1bb
@ -71,32 +71,47 @@ namespace JiShe.CollectBus.DataChannels
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader)
|
||||
{
|
||||
try
|
||||
{
|
||||
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
||||
var batchSize = 10000;
|
||||
var batchSize = 200_00;
|
||||
var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
|
||||
var timer = Stopwatch.StartNew();
|
||||
long timeoutMilliseconds = 0;
|
||||
|
||||
List<MeterReadingTelemetryPacketInfo> taskInfoList = new List<MeterReadingTelemetryPacketInfo>();
|
||||
var startTime = DateTime.Now;
|
||||
var timer = new Stopwatch();
|
||||
while (true)
|
||||
{
|
||||
var canRead = await _telemetryPacketInfoReader.WaitToReadAsync();
|
||||
if (!canRead)
|
||||
var canRead = _telemetryPacketInfoReader.Count;
|
||||
if (canRead <= 0)
|
||||
{
|
||||
if (timeoutMilliseconds > 0)
|
||||
{
|
||||
_logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
|
||||
}
|
||||
timeoutMilliseconds = 0;
|
||||
//无消息时短等待1秒
|
||||
await Task.Delay(100_0);
|
||||
continue;
|
||||
}
|
||||
timer.Restart();
|
||||
|
||||
var startTime = DateTime.Now;
|
||||
|
||||
while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_telemetryPacketInfoReader.TryRead(out var dataItem))
|
||||
{
|
||||
taskInfoList.AddRange(dataItem.Item2);
|
||||
}
|
||||
else
|
||||
}
|
||||
catch (Exception ee)
|
||||
{
|
||||
//无消息时短暂等待
|
||||
await Task.Delay(5);
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@ -115,10 +130,19 @@ namespace JiShe.CollectBus.DataChannels
|
||||
|
||||
taskInfoList.Clear();
|
||||
}
|
||||
timer.Stop();
|
||||
|
||||
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
|
||||
|
||||
startTime = DateTime.Now;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Kafka 推送消息
|
||||
|
||||
@ -32,6 +32,7 @@ using static IdentityModel.ClaimComparer;
|
||||
using JiShe.CollectBus.DataChannels;
|
||||
using JiShe.CollectBus.DataMigration.Options;
|
||||
using static System.Runtime.InteropServices.JavaScript.JSType;
|
||||
using static System.Formats.Asn1.AsnWriter;
|
||||
|
||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
{
|
||||
@ -257,7 +258,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
|
||||
if (tempTask == null || tempTask.Count <= 0)
|
||||
{
|
||||
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
|
||||
@ -332,9 +333,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
//此处代码不要删除
|
||||
#if DEBUG
|
||||
var timeDensity = "15";
|
||||
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||
var serverTagName = "JiSheCollectBus2";
|
||||
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||
|
||||
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
||||
List<string> focusAddressDataLista = new List<string>();
|
||||
@ -1499,8 +1501,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
List<T> meterInfos = new List<T>();
|
||||
decimal? cursor = null;
|
||||
string member = null;
|
||||
bool hasNext;
|
||||
do
|
||||
|
||||
while (true)
|
||||
{
|
||||
var page = await _redisDataCacheService.GetAllPagedData<T>(
|
||||
redisCacheMeterInfoHashKeyTemp,
|
||||
@ -1510,10 +1512,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
lastMember: member);
|
||||
|
||||
meterInfos.AddRange(page.Items);
|
||||
cursor = page.HasNext ? page.NextScore : null;
|
||||
member = page.HasNext ? page.NextMember : null;
|
||||
hasNext = page.HasNext;
|
||||
} while (hasNext);
|
||||
if (!page.HasNext)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
cursor = page.NextScore;
|
||||
member = page.NextMember;
|
||||
}
|
||||
|
||||
|
||||
//var page = await _redisDataCacheService.GetAllPagedData<T>(
|
||||
@ -1530,8 +1536,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
||||
return;
|
||||
}
|
||||
timer.Stop();
|
||||
|
||||
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
|
||||
|
||||
timer.Restart();
|
||||
|
||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||
items: meterInfos,
|
||||
deviceIdSelector: data => data.MeterId.ToString(),
|
||||
|
||||
@ -166,7 +166,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
|
||||
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
|
||||
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
|
||||
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime";
|
||||
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' ";
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(currentTime))
|
||||
{
|
||||
|
||||
@ -204,6 +204,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
||||
|
||||
int actualThreads = maxConcurrency ?? recommendedThreads;
|
||||
|
||||
|
||||
// 创建节流器
|
||||
using var throttler = new SemaphoreSlim(initialCount: actualThreads);
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user