JiShe.CollectBus/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs

276 lines
12 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeSql.Internal.CommonProvider;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Samples;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Encrypt;
using JiShe.ServicePro.FreeRedisProvider;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Subscribers
{
public class ServiceCommunicationChannelSubscriberService : CollectBusAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAnalysisAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IoTDBSessionPoolProvider _dbProvider;
private readonly IProtocolService _protocolService;
private readonly IFreeRedisProvider _freeRedisProvider;
private readonly ServerApplicationOptions _applicationOptions;
public ServiceCommunicationChannelSubscriberService(ILogger<SubscriberAnalysisAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IoTDBSessionPoolProvider dbProvider,
IProtocolService protocolService,
IFreeRedisProvider freeRedisProvider,
IOptions<ServerApplicationOptions> applicationOptions)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_dbProvider = dbProvider;
_protocolService = protocolService;
_freeRedisProvider = freeRedisProvider;
_applicationOptions = applicationOptions.Value;
}
/// <summary>
/// 数据通讯通道消息消费订阅
/// </summary>
/// <param name="issuedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationChannelTopic)]
public async Task<ISubscribeAck> ServiceCommunicationChannelIssuedEvent(Dictionary<ServiceCommunicationTypeEnum, string> issuedMessage)
{
Logger.LogWarning($"数据通讯通道消息消费订阅解析:{issuedMessage.Serialize()}");
var tempFirstKeyInfo = issuedMessage.FirstOrDefault();
if (tempFirstKeyInfo.Value == null)
{
return SubscribeAck.Success();
}
ServiceCommunicationTypeEnum serviceCommunication = tempFirstKeyInfo.Key;
bool tempResult = false;
switch (serviceCommunication)
{
case ServiceCommunicationTypeEnum.ArchivalDataIssued:
if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value))
{
// 解析数据
// 判断电表,水表,气表 下发档案
ArchivalDataIssuedInput? input = tempFirstKeyInfo.Value.Deserialize<ArchivalDataIssuedInput>();
if (input != null)
{
switch (input.MeterType)
{
case MeterTypeEnum.Ammeter:
tempResult = await SendArchivalDataIssued(input);
break;
case MeterTypeEnum.WaterMeter:
tempResult = await SendArchivalDataIssued(input);
break;
}
}
}
break;
case ServiceCommunicationTypeEnum.SetItemCodeTask:
if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value)){
SetItemCodeTaskInput? input = tempFirstKeyInfo.Value!.Deserialize<SetItemCodeTaskInput>();
if(input != null)
{
tempResult = await AutoCollectionItemsSetAsync(input);
}
}
break;
default:
throw new Exception("暂不支持该数据通讯通道消息消费订阅解析");
}
return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail();
}
/// <summary>
/// 发送档案数据
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
protected async Task<bool> SendArchivalDataIssued(ArchivalDataIssuedInput input)
{
try
{
var key = $"{RedisConst.CacheAllDeviceInfoHashKey}";
List<DeviceCacheInfo> deviceCacheInfos = await _freeRedisProvider.Instance.HGetAsync<List<DeviceCacheInfo>>(key, input.FocusAddress);
if (deviceCacheInfos == null || deviceCacheInfos.Count <= 0)
{
throw new Exception($"未能在缓存中找到设备{input.FocusAddress}缓存key:{key}");
}
DeviceCacheInfo? deviceCacheInfo = deviceCacheInfos.FirstOrDefault(d => d.MeterAddress == input.MeterAddress);
if (deviceCacheInfo == null)
{
throw new Exception($"未能在缓存中找到设备{input.FocusAddress}下的表{input.MeterAddress}缓存key:{key}");
}
var meterParameters = new List<AmmeterParameter>() {
new AmmeterParameter()
{
SerialNumber =deviceCacheInfo.MeteringCode,
Pn =deviceCacheInfo.MeteringCode,
BaudRate = Build3761SendData.GetBaudreate($"{deviceCacheInfo.Baudrate}"),
Port = deviceCacheInfo.MeteringPort,
ProtocolType = deviceCacheInfo.Protocol ?? 30,
Address = deviceCacheInfo.MeterAddress,
Password = deviceCacheInfo.Password,
RateNumber = deviceCacheInfo.SingleRate == true ? 1 : 4,
IntegerBitNumber = 7,
DecimalBitNumber = 4,
CollectorAddress = deviceCacheInfo.FocusAddress
}
};
ReqParameter2 reqParameter = new ReqParameter2();
var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(deviceCacheInfo.FocusAddress, meterParameters, (returnParameter) =>
{
reqParameter = returnParameter;
});
string frame = bytes.ToHexString();
// TODO: 需要插入iotdb 日志
await SaveLogTodIotDbAsync(deviceCacheInfo, reqParameter);
// 发送消息
return await SendMessageAsync(input.FocusAddress, bytes);
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 自动采集项目设置
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
protected async Task<bool> AutoCollectionItemsSetAsync(SetItemCodeTaskInput input)
{
try
{
var key = $"{RedisConst.CacheAllDeviceInfoHashKey}";
List<DeviceCacheInfo> deviceCacheInfos = await _freeRedisProvider.Instance.HGetAsync<List<DeviceCacheInfo>>(key, input.FocusAddress);
if (deviceCacheInfos == null || deviceCacheInfos.Count <= 0)
{
throw new Exception($"未能在缓存中找到设备{input.FocusAddress}缓存key:{key}");
}
DeviceCacheInfo? deviceCacheInfo = deviceCacheInfos.FirstOrDefault(d => d.MeterAddress == input.MeterAddress);
if (deviceCacheInfo == null)
{
throw new Exception($"未能在缓存中找到设备{input.FocusAddress}下的表{input.MeterAddress}缓存key:{key}");
}
var details = deviceCacheInfo.ItemCodes.Where(n => n.Trim().Substring(0, 3) == "0D_").Select(it => new PnFn(1, Convert.ToInt32(it.Split('_')[1]))).ToList();
ReqParameter2 reqParameter = new ReqParameter2();
var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(input.FocusAddress, 1, 0, input.Cycle, DateTime.Now, 1, details, (returnParameter) =>
{
reqParameter = returnParameter;
});
string frame = bytes.ToHexString();
// TODO: 需要插入iotdb 日志
await SaveLogTodIotDbAsync(deviceCacheInfo, reqParameter);
// 发送消息
return await SendMessageAsync(input.FocusAddress, bytes);
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 保存日志到iotdb
/// </summary>
/// <param name="deviceCacheInfo"></param>
/// <param name="reqParameter"></param>
/// <returns></returns>
private async Task SaveLogTodIotDbAsync(DeviceCacheInfo deviceCacheInfo, ReqParameter2 reqParameter)
{
var currentTime = DateTime.Now;
var taskMark = $"{deviceCacheInfo.FocusAddress}.{reqParameter.Pn}.{reqParameter.Fn}".Md5Fun();
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
ProjectId = $"{deviceCacheInfo.ProjectId}",
DeviceType = $"{deviceCacheInfo.DeviceType}",
DeviceId = $"{deviceCacheInfo.DeviceId}",
IoTDataType = IOTDBDataTypeConst.Log,
FocusId = deviceCacheInfo.FocusId,
FocusAddress = deviceCacheInfo.FocusAddress,
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = deviceCacheInfo.DatabaseBusiID,
PendingCopyReadTime = currentTime,
CreationTime = currentTime,
MeterAddress = deviceCacheInfo.MeterAddress, // 判断是否能取到表地址
AFN = (int)reqParameter.AFN,
Fn = reqParameter.Fn,
Seq = reqParameter.Seq.PRSEQ,
MSA = reqParameter.MSA,
ItemCode = $"{(int)reqParameter.AFN}_{reqParameter.Fn}",
SubItemCode = null,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = true,
Pn = reqParameter.Pn,
IsReceived = false,
ScoreValue = $"{deviceCacheInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
// 插入IOT日志数据
await _dbProvider.GetSessionPool(true).InsertAsync(taskData);
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="focusAddress"></param>
/// <param name="bytes"></param>
/// <returns></returns>
private async Task<bool> SendMessageAsync(string focusAddress, byte[] bytes)
{
var checkResult = _tcpService.ClientExists(focusAddress);
if (checkResult)
{
await _tcpService.SendAsync(focusAddress, bytes);
return true;
}
else
{
return false;
}
}
}
}