diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs index ab43cc8..f8bfa78 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/SendData/Telemetry3761PacketBuilder.cs @@ -250,7 +250,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.SendData Pn = request.Pn, Fn = request.Fn }; - var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); + + var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter,request.DataUnit); return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs index b7b3446..a469147 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs @@ -706,6 +706,36 @@ namespace JiShe.CollectBus.Protocol.T37612012 /// public static int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexTo4BinZero().IndexOf("1")); + #region 下行 + /// + /// 生成二类项采集项时间数据单元 + /// + /// + /// + /// + public virtual List Generate_DataUnit(DataTimeMark timeMark) + { + List values = new List + { + SplitDataTime(timeMark.DataTime)//数据时间 + }; + if (timeMark.Density > 0) + values.Add(timeMark.Density.HexToDecStr().PadLeft(2, '0'));//密度 + if (timeMark.Point > 0) + values.Add(timeMark.Point.HexToDecStr().PadLeft(2, '0'));//数据点数 + return values; + } + + + private string SplitDataTime(DateTime dataTime) + { + //2101060815 + List values = new List() { $"{dataTime}:YY", $"{dataTime}:MM", $"{dataTime}:dd", $"{dataTime}:HH", $"{dataTime}:mm", }; + + values.Reverse(); + return string.Join("", values); + } + #endregion #region 上行命令 diff --git a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs index 4dc789e..a4f820b 100644 --- a/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T6452007/T6452007ProtocolPlugin.cs @@ -113,6 +113,11 @@ namespace JiShe.CollectBus.Protocol.T6452007 } } + if (aFNStr == "0D")//二类数据 + { + dataUnit = Generate_DataUnit(request.DataTimeMark); + } + string afnMethonCode = $"AFN{aFNStr}_Fn_Send"; if (base.T3761AFNHandlers != null && base.T3761AFNHandlers.TryGetValue(afnMethonCode , out var handler)) diff --git a/protocols/JiShe.CollectBus.Protocol/Models/DataTimeMark.cs b/protocols/JiShe.CollectBus.Protocol/Models/DataTimeMark.cs new file mode 100644 index 0000000..2efbaad --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol/Models/DataTimeMark.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Protocol +{ + /// + /// 数据时间标记 + /// + public class DataTimeMark + { + /// + /// 数据时间 + /// + public DateTime DataTime { get; set;} + + /// + /// 数据点数 + /// + public int Point { get; set; } + + /// + /// 冻结密度(-1、采集项本身无密度位,0、无,1、15分钟,2、30分钟,3、60分钟,245、5分钟,255、1分钟) + /// + public int Density { get; set; } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs b/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs index 1e0136a..e287547 100644 --- a/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs +++ b/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs @@ -1,4 +1,6 @@ -namespace JiShe.CollectBus.Protocol.Models +using JiShe.CollectBus.Common.BuildSendDatas; + +namespace JiShe.CollectBus.Protocol.Models { /// /// 报文构建参数 @@ -21,9 +23,9 @@ public required string ItemCode { get; set; } /// - /// 任务时间戳 + /// 任务时间 /// - public long TimeStamp { get; set; } + public DataTimeMark DataTimeMark { get; set; } /// /// 集中器转发协议构建构建参数 diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index c335597..757af18 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -72,51 +72,77 @@ namespace JiShe.CollectBus.DataChannels /// public async Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader) { - var metadata = await _dbProvider.GetMetadata(); - var batchSize = 10000; - var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒 - - List taskInfoList = new List(); - var startTime = DateTime.Now; - var timer = new Stopwatch(); - while (true) + try { - var canRead = await _telemetryPacketInfoReader.WaitToReadAsync(); - if (!canRead) - { - continue; - } + var metadata = await _dbProvider.GetMetadata(); + var batchSize = 200_00; + var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒 + var timer = Stopwatch.StartNew(); + long timeoutMilliseconds = 0; - while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout) + List taskInfoList = new List(); + while (true) { - if (_telemetryPacketInfoReader.TryRead(out var dataItem)) + var canRead = _telemetryPacketInfoReader.Count; + if (canRead <= 0) { - taskInfoList.AddRange(dataItem.Item2); - } - else - { - //无消息时短暂等待 - await Task.Delay(5); - } - } - - if (taskInfoList != null && taskInfoList.Count > 0) - { - await _dbProvider.BatchInsertAsync(metadata, taskInfoList); - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: taskInfoList.ToList(), - deviceIdSelector: data => data.DeviceId, - processor: (data, groupIndex) => + if (timeoutMilliseconds > 0) { - // _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex); + _logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); } - ); + timeoutMilliseconds = 0; + //无消息时短等待1秒 + await Task.Delay(100_0); + continue; + } + timer.Restart(); - taskInfoList.Clear(); + var startTime = DateTime.Now; + string topicName = string.Empty; + + while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout) + { + try + { + if (_telemetryPacketInfoReader.TryRead(out var dataItem)) + { + topicName = dataItem.Item1; + taskInfoList.AddRange(dataItem.Item2); + } + } + catch (Exception ee) + { + + throw; + } + } + + if (taskInfoList != null && taskInfoList.Count > 0) + { + await _dbProvider.BatchInsertAsync(metadata, taskInfoList); + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: taskInfoList.ToList(), + deviceIdSelector: data => data.DeviceId, + processor: (data, groupIndex) => + { + //_ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex); + } + ); + + taskInfoList.Clear(); + } + timer.Stop(); + + timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; + + startTime = DateTime.Now; } - - startTime = DateTime.Now; + } + catch (Exception ex) + { + + throw; } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 6d5e01a..e707f8a 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -32,6 +32,7 @@ using static IdentityModel.ClaimComparer; using JiShe.CollectBus.DataChannels; using JiShe.CollectBus.DataMigration.Options; using static System.Runtime.InteropServices.JavaScript.JSType; +using static System.Formats.Asn1.AsnWriter; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -47,7 +48,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IProtocolService _protocolService; private readonly DataMigrationOptions _dataMigrationOptions; private readonly KafkaOptionConfig _kafkaOptions; - private readonly ServerApplicationOptions _applicationOptions; + private readonly ServerApplicationOptions _applicationOptions; int pageSize = 10000; @@ -69,7 +70,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _dataMigrationOptions = dataMigrationOptions.Value; _kafkaOptions = kafkaOptions.Value; - _applicationOptions = applicationOptions.Value; + _applicationOptions = applicationOptions.Value; } @@ -116,7 +117,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } var currentTime = DateTime.Now; - + //定时抄读 foreach (var item in taskInfos) @@ -169,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading meterType: MeterTypeEnum.Ammeter, taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => { - var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps); + var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); @@ -257,7 +258,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { - _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); + //_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask)); @@ -298,14 +299,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //电表定时阀控任务处理。 - var autoValveControlTask = await AmmeterScheduledAutoValveControl(); + var autoValveControlTask = await AmmeterScheduledAutoValveControl(); if (autoValveControlTask == null || autoValveControlTask.Count <= 0) { _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); } #region 电表采集处理 @@ -332,9 +333,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading //此处代码不要删除 #if DEBUG var timeDensity = "15"; - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var serverTagName = "JiSheCollectBus2"; + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; List meterInfos = new List(); List focusAddressDataLista = new List(); @@ -734,6 +736,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammeterInfo.FocusAddress, Pn = ammeterInfo.MeteringCode, ItemCode = tempItem, + DataTimeMark = new Protocol.DataTimeMark() + { + Density = ammeterInfo.TimeDensity,//todo 转换成协议的值 + Point = 1, + DataTime = timestamps, + } }); if (builderResponse == null || builderResponse.Data.Length <= 0) { @@ -1499,8 +1507,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading List meterInfos = new List(); decimal? cursor = null; string member = null; - bool hasNext; - do + + while (true) { var page = await _redisDataCacheService.GetAllPagedData( redisCacheMeterInfoHashKeyTemp, @@ -1510,11 +1518,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading lastMember: member); meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + if (!page.HasNext) + { + break; + } + cursor = page.NextScore; + member = page.NextMember; + } //var page = await _redisDataCacheService.GetAllPagedData( // redisCacheMeterInfoHashKeyTemp, @@ -1530,8 +1541,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } + timer.Stop(); + _logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒"); + timer.Restart(); + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.MeterId.ToString(), diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 2a96f64..d652722 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -166,7 +166,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID - WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime"; + WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' "; if (!string.IsNullOrWhiteSpace(currentTime)) { diff --git a/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index 4149afd..470efe7 100644 --- a/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -204,6 +204,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl int actualThreads = maxConcurrency ?? recommendedThreads; + // 创建节流器 using var throttler = new SemaphoreSlim(initialCount: actualThreads); diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 6936ad3..d7e12e3 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -245,7 +245,7 @@ namespace JiShe.CollectBus.Common.Extensions /// /// /// - public static DateTime ParseIntToDate(long dateLong) + public static DateTime ParseIntToDate(this long dateLong) { if (dateLong < 10000101 || dateLong > 99991231) {