diff --git a/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs index 22d6820..d753ddc 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -1,13 +1,21 @@ -using JiShe.CollectBus.Common.BuildSendDatas; +using FreeSql.Internal.CommonProvider; +using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Samples; +using JiShe.ServicePro.Core; using JiShe.ServicePro.Dto; +using JiShe.ServicePro.Encrypt; using JiShe.ServicePro.FreeRedisProvider; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using TouchSocket.Sockets; @@ -22,13 +30,15 @@ namespace JiShe.CollectBus.Subscribers private readonly IoTDBSessionPoolProvider _dbProvider; private readonly IProtocolService _protocolService; private readonly IFreeRedisProvider _freeRedisProvider; + private readonly ServerApplicationOptions _applicationOptions; public ServiceCommunicationChannelSubscriberService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IoTDBSessionPoolProvider dbProvider, IProtocolService protocolService, - IFreeRedisProvider freeRedisProvider) + IFreeRedisProvider freeRedisProvider, + IOptions applicationOptions) { _logger = logger; _tcpService = tcpService; @@ -36,6 +46,7 @@ namespace JiShe.CollectBus.Subscribers _dbProvider = dbProvider; _protocolService = protocolService; _freeRedisProvider = freeRedisProvider; + _applicationOptions = applicationOptions.Value; } @@ -69,10 +80,11 @@ namespace JiShe.CollectBus.Subscribers switch (input.MeterType) { case MeterTypeEnum.Ammeter: - + await SendArchivalDataIssued(input); break; case MeterTypeEnum.WaterMeter: + await SendArchivalDataIssued(input); break; } } @@ -95,22 +107,55 @@ namespace JiShe.CollectBus.Subscribers return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail(); } - protected async Task SendArchivalDataIssued(FocusCacheInfos focusCacheInfo) + /// + /// 发送档案数据 + /// + /// + /// + protected async Task SendArchivalDataIssued(ArchivalDataIssuedInput input) { try { - var checkResult = _tcpService.ClientExists(focusCacheInfo.FocusAddress); - if (checkResult) + var key = $"{RedisConst.CacheAllDeviceInfoHashKey}"; + List deviceCacheInfos = await _freeRedisProvider.Instance.HGetAsync>(key, input.FocusAddress); + if (deviceCacheInfos == null || deviceCacheInfos.Count <= 0) { - string issuedMessageHexString = ""; - await _tcpService.SendAsync(focusCacheInfo.FocusAddress, Convert.FromHexString(issuedMessageHexString)); + throw new Exception($"未能在缓存中找到设备{input.FocusAddress},缓存key:{key}"); + } + DeviceCacheInfo? deviceCacheInfo = deviceCacheInfos.FirstOrDefault(d => d.MeterAddress == input.MeterAddress); + if (deviceCacheInfo == null) + { + throw new Exception($"未能在缓存中找到设备{input.FocusAddress}下的表{input.MeterAddress},缓存key:{key}"); + } - return true; - } - else + var meterParameters = new List() { + new AmmeterParameter() + { + SerialNumber =deviceCacheInfo.MeteringCode, + Pn =deviceCacheInfo.MeteringCode, + BaudRate = Build3761SendData.GetBaudreate($"{deviceCacheInfo.Baudrate}"), + Port = deviceCacheInfo.MeteringPort, + ProtocolType = deviceCacheInfo.Protocol.Value, + Address = deviceCacheInfo.MeterAddress, + Password = deviceCacheInfo.Password, + RateNumber = deviceCacheInfo.SingleRate == true ? 1 : 4, + IntegerBitNumber = 7, + DecimalBitNumber = 4, + CollectorAddress = deviceCacheInfo.FocusAddress + } + }; + + ReqParameter2 reqParameter = new ReqParameter2(); + var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(deviceCacheInfo.FocusAddress, meterParameters, (returnParameter) => { - return false; - } + reqParameter = returnParameter; + }); + + // TODO: 需要插入iotdb 日志 + await SaveLogTodIotDbAsync(deviceCacheInfo, reqParameter); + + // 发送消息 + return await SendMessageAsync(input.FocusAddress, bytes); } catch (Exception) { @@ -141,24 +186,19 @@ namespace JiShe.CollectBus.Subscribers } var details = deviceCacheInfo.ItemCodes.Where(n => n.Trim().Substring(0, 3) == "0D_").Select(it => new PnFn(1, Convert.ToInt32(it.Split('_')[1]))).ToList(); - var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(input.FocusAddress, 1, 0, input.Cycle, DateTime.Now, 1, details); + ReqParameter2 reqParameter = new ReqParameter2(); + var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(input.FocusAddress, 1, 0, input.Cycle, DateTime.Now, 1, details, (returnParameter) => + { + reqParameter = returnParameter; + }); string frame = bytes.ToHexString(); // TODO: 需要插入iotdb 日志 + await SaveLogTodIotDbAsync(deviceCacheInfo, reqParameter); - - var checkResult = _tcpService.ClientExists(input.FocusAddress); - if (checkResult) - { - await _tcpService.SendAsync(input.FocusAddress, bytes); - - return true; - } - else - { - return false; - } + // 发送消息 + return await SendMessageAsync(input.FocusAddress, bytes); } catch (Exception) { @@ -167,5 +207,68 @@ namespace JiShe.CollectBus.Subscribers } + + /// + /// 保存日志到iotdb + /// + /// + /// + /// + private async Task SaveLogTodIotDbAsync(DeviceCacheInfo deviceCacheInfo, ReqParameter2 reqParameter) + { + var currentTime = DateTime.Now; + var taskMark = $"{deviceCacheInfo.FocusAddress}.{reqParameter.Pn}.{reqParameter.Fn}".Md5Fun(); + var taskData = new MeterReadingTelemetryPacketInfo() + { + SystemName = _applicationOptions.SystemType, + ProjectId = $"{deviceCacheInfo.ProjectId}", + DeviceType = $"{deviceCacheInfo.DeviceType}", + DeviceId = $"{deviceCacheInfo.DeviceId}", + IoTDataType = IOTDBDataTypeConst.Log, + FocusId = deviceCacheInfo.FocusId, + FocusAddress = deviceCacheInfo.FocusAddress, + Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), + DatabaseBusiID = deviceCacheInfo.DatabaseBusiID, + PendingCopyReadTime = currentTime, + CreationTime = currentTime, + MeterAddress = deviceCacheInfo.MeterAddress, // 判断是否能取到表地址 + AFN = (int)reqParameter.AFN, + Fn = reqParameter.Fn, + Seq = reqParameter.Seq.PRSEQ, + MSA = reqParameter.MSA, + ItemCode = $"{(int)reqParameter.AFN}_{reqParameter.Fn}", + SubItemCode = null, + TaskMark = taskMark, + IsSend = false, + ManualOrNot = true, + Pn = reqParameter.Pn, + IsReceived = false, + ScoreValue = $"{deviceCacheInfo.FocusAddress}.{taskMark}".Md5Fun(), + }; + + // 插入IOT日志数据 + await _dbProvider.GetSessionPool(true).InsertAsync(taskData); + } + + /// + /// 发送消息 + /// + /// + /// + /// + private async Task SendMessageAsync(string focusAddress, byte[] bytes) + { + var checkResult = _tcpService.ClientExists(focusAddress); + if (checkResult) + { + await _tcpService.SendAsync(focusAddress, bytes); + + return true; + } + else + { + return false; + } + } } } diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs index b77ea2f..cca1724 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs @@ -497,8 +497,9 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// /// 集中器地址 /// + /// /// - public static byte[] BuildAmmeterParameterSetSendCmd(string address, List meterParameters) + public static byte[] BuildAmmeterParameterSetSendCmd(string address, List meterParameters,Action? action = null) { var dataUnit = BuildAmmeterParameterSendDataUnit(meterParameters); var reqParameter = new ReqParameter2() @@ -518,6 +519,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = 10 }; var bytes = BuildSendCommandBytes(reqParameter, dataUnit); + action?.Invoke(reqParameter); return bytes; } @@ -715,8 +717,9 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// 发送基准时间 /// 曲线数据提取倍率 /// + /// /// - public static byte[] BuildAmmeterReportCollectionItemsSetSendCmd(string address, int pn,int unit,int cycle,DateTime baseTime,int curveRatio,List pnFns) + public static byte[] BuildAmmeterReportCollectionItemsSetSendCmd(string address, int pn,int unit,int cycle,DateTime baseTime,int curveRatio,List pnFns,Action? action=null) { List dataUnit = new List(); @@ -750,6 +753,8 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = 66 }; var bytes = BuildSendCommandBytes(reqParameter, dataUnit); + + action?.Invoke(reqParameter); return bytes; }