diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index e5077cc..f965978 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -64,7 +64,7 @@ public class CollectBusApplicationModule : AbpModule //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); - //await dbContext.InitAmmeterCacheData(); + await dbContext.InitAmmeterCacheData(); //await dbContext.InitWatermeterCacheData(); //初始化主题信息 diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 18f6780..a760936 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -8,6 +8,7 @@ using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; @@ -25,7 +26,6 @@ using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; -using JiShe.CollectBus.IoTDB.Interface; using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.ScheduledMeterReading @@ -182,7 +182,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading deviceIdSelector: data => data.FocusAddress, processor: (data, groupIndex) => { - AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); + AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); } ); @@ -306,8 +306,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - return; + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + //return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -575,8 +575,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); + var stopwatch = Stopwatch.StartNew(); //获取缓存中的电表信息 int timeDensity = 15; @@ -589,97 +588,23 @@ namespace JiShe.CollectBus.ScheduledMeterReading { MaxDegreeOfParallelism = recommendedThreads, }; - string taskBatch = "20250417155016"; + string taskBatch = "20250417171649"; Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - pageSize: 1000, - lastScore: cursor, - lastMember: member); - - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - _= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex); - } - ); - - } while (hasNext); + _ = GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); }); - + stopwatch.Stop(); - //var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - //var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - //if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) - //{ - // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - // return; - //} - - ////获取下发任务缓存数据 - //Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - //if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - //{ - // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - // return; - //} - - //List meterTaskInfosList = new List(); - - ////将取出的缓存任务数据发送到Kafka消息队列中 - //foreach (var focusItem in meterTaskInfos) - //{ - // foreach (var ammerterItem in focusItem.Value) - // { - // var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - // { - // MessageHexString = ammerterItem.Value.IssuedMessageHexString, - // MessageId = ammerterItem.Value.IssuedMessageId, - // FocusAddress = ammerterItem.Value.FocusAddress, - // TimeDensity = timeDensity.ToString(), - // }; - // //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - // _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - // //_ = _producerBus.Publish(tempMsg); - - // meterTaskInfosList.Add(ammerterItem.Value); - // } - //} - //if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - //{ - // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); - //} - - - //stopwatch.Stop(); - - //_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + await Task.CompletedTask; } - + /// /// 电表创建发布任务 @@ -697,7 +622,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var currentTime = DateTime.Now; var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; @@ -781,7 +706,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } - //Dictionary keyValuePairs = new Dictionary(); List taskList = new List(); foreach (var tempItem in tempCodes) @@ -801,11 +725,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading var aFNStr = itemCodeArr[0]; var aFN = (AFN)aFNStr.HexToDec(); var fn = int.Parse(itemCodeArr[1]); - byte[] dataInfos = null; + TelemetryPacketResponse builderResponse = null; if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) { //实时数据 - dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn); + builderResponse = TelemetryPacketBuilder.AFN0C_Fn_Send(new TelemetryPacketRequest() + { + FocusAddress = ammeterInfo.FocusAddress, + Fn = fn, + Pn = ammeterInfo.MeteringCode + }); } else { @@ -814,7 +743,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode , out var handler)) { - dataInfos = handler(new TelemetryPacketRequest() + builderResponse = handler(new TelemetryPacketRequest() { FocusAddress = ammeterInfo.FocusAddress, Fn = fn, @@ -829,7 +758,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //TODO:特殊表 - if (dataInfos == null || dataInfos.Length <= 0) + if (builderResponse == null || builderResponse.Data.Length <= 0) { //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); continue; @@ -850,29 +779,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusId = ammeterInfo.FocusId, AFN = aFN, Fn = fn, + Seq = builderResponse.Seq, + MSA = builderResponse.MSA, ItemCode = tempItem, - TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode), + TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA), + IsSend = false, ManualOrNot = false, Pn = ammeterInfo.MeteringCode, IssuedMessageId = GuidGenerator.Create().ToString(), - IssuedMessageHexString = Convert.ToHexString(dataInfos), + IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), + IsReceived = false, }; - //meterReadingRecords.CreateDataId(GuidGenerator.Create()); - taskList.Add(meterReadingRecords); } - //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); - //await Task.Delay(timeSpan); - - //return keyValuePairs; - // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); - - //using (var pipe = FreeRedisProvider.Instance.StartPipe()) - //{ - // pipe.HSet(redisCacheKey, keyValuePairs); - // object[] ret = pipe.EndPipe(); - //} + if (taskList == null || taskList.Count() <= 0 || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) @@ -914,7 +835,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 对应分区,也就是集中器号所在的分组序号 /// private async Task KafkaProducerIssuedMessage(string topicName, - MeterReadingTelemetryPacketInfo taskRecord,int partition) + MeterReadingTelemetryPacketInfo taskRecord, int partition) { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { @@ -924,58 +845,38 @@ namespace JiShe.CollectBus.ScheduledMeterReading await _producerService.ProduceAsync(topicName, partition, taskRecord); } - private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) + private async Task GetTaskInfoListToKafka( + string redisCacheTelemetryPacketInfoHashKey, + string redisCacheTelemetryPacketInfoZSetScoresIndexKey) { - var currentDateTime = DateTime.Now; - - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType); - - //FreeRedisProvider.Instance.key() - - var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) + decimal? cursor = null; + string member = null; + bool hasNext; + do { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + pageSize: 1000, + lastScore: cursor, + lastMember: member); - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: page.Items, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + _ = KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + } + ); - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); - } + } while (hasNext); } - + #endregion diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index c3f75d3..3aafa41 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -34,7 +34,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 任务数据唯一标记 /// - public string TaskMark { get; set; } + public decimal TaskMark { get; set; } /// /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。 @@ -96,7 +96,21 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// 采集项编码 /// public string ItemCode { get; set;} - + + /// + /// 帧序列域SEQ + /// + public required Seq Seq { get; set; } + + /// + /// 地址域A3的主站地址MSA + /// + public int MSA { get; set; } + + /// + /// 是否发送 + /// + public bool IsSend { get; set; } /// /// 创建时间 @@ -132,6 +146,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// 上报报文解析备注,异常情况下才有 /// public string ReceivedRemark { get; set; } + + /// + /// 是否已上报 + /// + public bool IsReceived { get; set; } //public void CreateDataId(Guid Id) //{ diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs index 8ad2a39..5db0dd7 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs @@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// /// /// - public delegate byte[] AFNDelegate(TelemetryPacketRequest request); + public delegate TelemetryPacketResponse AFNDelegate(TelemetryPacketRequest request); /// /// 编码与方法的映射表 @@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas } #region AFN_00H 确认∕否认 - public static byte[] AFN00_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN00_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -64,13 +64,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_01H 复位命令 - public static byte[] AFN01_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN01_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -89,13 +89,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_02H 链路接口检测 - public static byte[] AFN02_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN02_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -114,12 +114,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_04H 设置参数 - public static byte[] AFN04_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN04_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -138,13 +138,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_05H 控制命令 - public static byte[] AFN05_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN05_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -163,12 +163,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_09H 请求终端配置及信息 - public static byte[] AFN09_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN09_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -187,13 +187,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0AH 查询参数 - public static byte[] AFN0A_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN0A_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -212,12 +212,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0CH 请求一类数据 - public static byte[] AFN0C_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN0C_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -236,12 +236,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0DH 请求二类数据 - public static byte[] AFN0D_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN0D_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -260,12 +260,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return bytes; + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN10H 数据转发 - public static byte[] AFN10_Fn_Send(TelemetryPacketRequest request) + public static TelemetryPacketResponse AFN10_Fn_Send(TelemetryPacketRequest request) { var reqParameter = new ReqParameter2() { @@ -283,8 +283,8 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Pn = request.Pn, Fn = request.Fn }; - var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter,request.DataUnit); - return bytes; + var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit); + return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; } #region SpecialAmmeter 特殊电表转发 diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs new file mode 100644 index 0000000..8cd964a --- /dev/null +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs @@ -0,0 +1,30 @@ +using JiShe.CollectBus.Common.Models; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.BuildSendDatas +{ + /// + /// 报文构建返回结果 + /// + public class TelemetryPacketResponse + { + /// + /// 帧序列域SEQ + /// + public required Seq Seq { get; set; } + + /// + /// 地址域A3的主站地址MSA + /// + public int MSA { get; set; } + + /// + /// 报文体 + /// + public required byte[] Data { get; set; } + } +} diff --git a/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs b/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs index 471c897..c3f1e9a 100644 --- a/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/CommonConst.cs @@ -11,25 +11,31 @@ namespace JiShe.CollectBus.Common.Consts /// public class CommonConst { - /// - /// 服务器标识 - /// - public const string ServerTagName = "ServerTagName"; /// /// Kafka /// public const string Kafka = "Kafka"; + /// + /// 服务器标识 + /// + public const string ServerTagName = $"{Kafka}:ServerTagName"; + /// /// Kafka副本数量 /// - public const string KafkaReplicationFactor = "KafkaReplicationFactor"; + public const string KafkaReplicationFactor = $"{Kafka}:KafkaReplicationFactor"; /// /// Kafka主题分区数量 /// - public const string NumPartitions = "NumPartitions"; + public const string NumPartitions = $"{Kafka}:NumPartitions"; + + /// + /// 首次采集时间 + /// + public const string FirstCollectionTime = "FirstCollectionTime"; } } diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 22ebef5..6871a9e 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -669,7 +669,7 @@ namespace JiShe.CollectBus.Common.Helpers return att == null ? field.Name : ((DescriptionAttribute)att).Description; } - + /// /// 将传入的字符串中间部分字符替换成特殊字符 @@ -759,7 +759,7 @@ namespace JiShe.CollectBus.Common.Helpers } return fontValue; - } + } /// /// 获取任务标识 @@ -767,10 +767,13 @@ namespace JiShe.CollectBus.Common.Helpers /// /// /// + /// /// - public static string GetTaskMark(int afn,int fn,int pn) + public static decimal GetTaskMark(int afn, int fn, int pn, int msa) { - return $"{afn.ToString().PadLeft(2,'0')}{fn}{pn}"; + var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}"; + + return Convert.ToInt32(makstr) << 32 | msa; } } } diff --git a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs index d397151..fd7e3c8 100644 --- a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs +++ b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs @@ -30,5 +30,10 @@ namespace JiShe.CollectBus.Common.Models /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 /// public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId; + + /// + /// 是否已处理 + /// + public virtual bool IsHandle { get; set; } = false; } } diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index f776202..2da5580 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -84,7 +84,7 @@ "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus2" + "ServerTagName": "JiSheCollectBus3" }, "IoTDBOptions": { "UserName": "root", @@ -95,7 +95,6 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, - "ServerTagName": "JiSheCollectBus3", "Cassandra": { "ReplicationStrategy": { "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下 @@ -144,5 +143,6 @@ "SerialConsistencyLevel": "Serial", "DefaultIdempotence": true } - } + }, + "FirstCollectionTime": "2025-04-18 00:00:00" } \ No newline at end of file