using FreeSql.Internal.CommonProvider; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Samples; using JiShe.ServicePro.Core; using JiShe.ServicePro.Dto; using JiShe.ServicePro.Encrypt; using JiShe.ServicePro.FreeRedisProvider; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; using TouchSocket.Sockets; namespace JiShe.CollectBus.Subscribers { public class ServiceCommunicationChannelSubscriberService : CollectBusAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IoTDBSessionPoolProvider _dbProvider; private readonly IProtocolService _protocolService; private readonly IFreeRedisProvider _freeRedisProvider; private readonly ServerApplicationOptions _applicationOptions; public ServiceCommunicationChannelSubscriberService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IoTDBSessionPoolProvider dbProvider, IProtocolService protocolService, IFreeRedisProvider freeRedisProvider, IOptions applicationOptions) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _dbProvider = dbProvider; _protocolService = protocolService; _freeRedisProvider = freeRedisProvider; _applicationOptions = applicationOptions.Value; } /// /// 数据通讯通道消息消费订阅 /// /// /// [KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationChannelTopic)] public async Task ServiceCommunicationChannelIssuedEvent(Dictionary issuedMessage) { Logger.LogWarning($"数据通讯通道消息消费订阅解析:{issuedMessage.Serialize()}"); var tempFirstKeyInfo = issuedMessage.FirstOrDefault(); if (tempFirstKeyInfo.Value == null) { return SubscribeAck.Success(); } ServiceCommunicationTypeEnum serviceCommunication = tempFirstKeyInfo.Key; bool tempResult = false; switch (serviceCommunication) { case ServiceCommunicationTypeEnum.ArchivalDataIssued: if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value)) { // 解析数据 // 判断电表,水表,气表 下发档案 ArchivalDataIssuedInput? input = tempFirstKeyInfo.Value.Deserialize(); if (input != null) { switch (input.MeterType) { case MeterTypeEnum.Ammeter: tempResult = await SendArchivalDataIssued(input); break; case MeterTypeEnum.WaterMeter: tempResult = await SendArchivalDataIssued(input); break; } } } break; case ServiceCommunicationTypeEnum.SetItemCodeTask: if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value)){ SetItemCodeTaskInput? input = tempFirstKeyInfo.Value!.Deserialize(); if(input != null) { tempResult = await AutoCollectionItemsSetAsync(input); } } break; default: throw new Exception("暂不支持该数据通讯通道消息消费订阅解析"); } return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail(); } /// /// 发送档案数据 /// /// /// protected async Task SendArchivalDataIssued(ArchivalDataIssuedInput input) { try { var key = $"{RedisConst.CacheAllDeviceInfoHashKey}"; List deviceCacheInfos = await _freeRedisProvider.Instance.HGetAsync>(key, input.FocusAddress); if (deviceCacheInfos == null || deviceCacheInfos.Count <= 0) { throw new Exception($"未能在缓存中找到设备{input.FocusAddress},缓存key:{key}"); } DeviceCacheInfo? deviceCacheInfo = deviceCacheInfos.FirstOrDefault(d => d.MeterAddress == input.MeterAddress); if (deviceCacheInfo == null) { throw new Exception($"未能在缓存中找到设备{input.FocusAddress}下的表{input.MeterAddress},缓存key:{key}"); } var meterParameters = new List() { new AmmeterParameter() { SerialNumber =deviceCacheInfo.MeteringCode, Pn =deviceCacheInfo.MeteringCode, BaudRate = Build3761SendData.GetBaudreate($"{deviceCacheInfo.Baudrate}"), Port = deviceCacheInfo.MeteringPort, ProtocolType = deviceCacheInfo.Protocol ?? 30, Address = deviceCacheInfo.MeterAddress, Password = deviceCacheInfo.Password, RateNumber = deviceCacheInfo.SingleRate == true ? 1 : 4, IntegerBitNumber = 7, DecimalBitNumber = 4, CollectorAddress = deviceCacheInfo.FocusAddress } }; ReqParameter2 reqParameter = new ReqParameter2(); var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(deviceCacheInfo.FocusAddress, meterParameters, (returnParameter) => { reqParameter = returnParameter; }); string frame = bytes.ToHexString(); // TODO: 需要插入iotdb 日志 await SaveLogTodIotDbAsync(deviceCacheInfo, reqParameter); // 发送消息 return await SendMessageAsync(input.FocusAddress, bytes); } catch (Exception) { throw; } } /// /// 自动采集项目设置 /// /// /// protected async Task AutoCollectionItemsSetAsync(SetItemCodeTaskInput input) { try { var key = $"{RedisConst.CacheAllDeviceInfoHashKey}"; List deviceCacheInfos = await _freeRedisProvider.Instance.HGetAsync>(key, input.FocusAddress); if (deviceCacheInfos == null || deviceCacheInfos.Count <= 0) { throw new Exception($"未能在缓存中找到设备{input.FocusAddress},缓存key:{key}"); } DeviceCacheInfo? deviceCacheInfo = deviceCacheInfos.FirstOrDefault(d => d.MeterAddress == input.MeterAddress); if (deviceCacheInfo == null) { throw new Exception($"未能在缓存中找到设备{input.FocusAddress}下的表{input.MeterAddress},缓存key:{key}"); } var details = deviceCacheInfo.ItemCodes.Where(n => n.Trim().Substring(0, 3) == "0D_").Select(it => new PnFn(1, Convert.ToInt32(it.Split('_')[1]))).ToList(); ReqParameter2 reqParameter = new ReqParameter2(); var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(input.FocusAddress, 1, 0, input.Cycle, DateTime.Now, 1, details, (returnParameter) => { reqParameter = returnParameter; }); string frame = bytes.ToHexString(); // TODO: 需要插入iotdb 日志 await SaveLogTodIotDbAsync(deviceCacheInfo, reqParameter); // 发送消息 return await SendMessageAsync(input.FocusAddress, bytes); } catch (Exception) { throw; } } /// /// 保存日志到iotdb /// /// /// /// private async Task SaveLogTodIotDbAsync(DeviceCacheInfo deviceCacheInfo, ReqParameter2 reqParameter) { var currentTime = DateTime.Now; var taskMark = $"{deviceCacheInfo.FocusAddress}.{reqParameter.Pn}.{reqParameter.Fn}".Md5Fun(); var taskData = new MeterReadingTelemetryPacketInfo() { SystemName = _applicationOptions.SystemType, ProjectId = $"{deviceCacheInfo.ProjectId}", DeviceType = $"{deviceCacheInfo.DeviceType}", DeviceId = $"{deviceCacheInfo.DeviceId}", IoTDataType = IOTDBDataTypeConst.Log, FocusId = deviceCacheInfo.FocusId, FocusAddress = deviceCacheInfo.FocusAddress, Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), DatabaseBusiID = deviceCacheInfo.DatabaseBusiID, PendingCopyReadTime = currentTime, CreationTime = currentTime, MeterAddress = deviceCacheInfo.MeterAddress, // 判断是否能取到表地址 AFN = (int)reqParameter.AFN, Fn = reqParameter.Fn, Seq = reqParameter.Seq.PRSEQ, MSA = reqParameter.MSA, ItemCode = $"{(int)reqParameter.AFN}_{reqParameter.Fn}", SubItemCode = null, TaskMark = taskMark, IsSend = false, ManualOrNot = true, Pn = reqParameter.Pn, IsReceived = false, ScoreValue = $"{deviceCacheInfo.FocusAddress}.{taskMark}".Md5Fun(), }; // 插入IOT日志数据 await _dbProvider.GetSessionPool(true).InsertAsync(taskData); } /// /// 发送消息 /// /// /// /// private async Task SendMessageAsync(string focusAddress, byte[] bytes) { var checkResult = _tcpService.ClientExists(focusAddress); if (checkResult) { await _tcpService.SendAsync(focusAddress, bytes); return true; } else { return false; } } } }