JiShe.CollectBus/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs

172 lines
6.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Samples;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.FreeRedisProvider;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Subscribers
{
public class ServiceCommunicationChannelSubscriberService : CollectBusAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAnalysisAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IoTDBSessionPoolProvider _dbProvider;
private readonly IProtocolService _protocolService;
private readonly IFreeRedisProvider _freeRedisProvider;
public ServiceCommunicationChannelSubscriberService(ILogger<SubscriberAnalysisAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IoTDBSessionPoolProvider dbProvider,
IProtocolService protocolService,
IFreeRedisProvider freeRedisProvider)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_dbProvider = dbProvider;
_protocolService = protocolService;
_freeRedisProvider = freeRedisProvider;
}
/// <summary>
/// 数据通讯通道消息消费订阅
/// </summary>
/// <param name="issuedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationChannelTopic)]
public async Task<ISubscribeAck> ServiceCommunicationChannelIssuedEvent(Dictionary<ServiceCommunicationTypeEnum, string> issuedMessage)
{
Logger.LogWarning($"数据通讯通道消息消费订阅解析:{issuedMessage.Serialize()}");
var tempFirstKeyInfo = issuedMessage.FirstOrDefault();
if (tempFirstKeyInfo.Value == null)
{
return SubscribeAck.Success();
}
ServiceCommunicationTypeEnum serviceCommunication = tempFirstKeyInfo.Key;
bool tempResult = false;
switch (serviceCommunication)
{
case ServiceCommunicationTypeEnum.ArchivalDataIssued:
if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value))
{
// 解析数据
// 判断电表,水表,气表 下发档案
ArchivalDataIssuedInput? input = tempFirstKeyInfo.Value.Deserialize<ArchivalDataIssuedInput>();
if (input != null)
{
switch (input.MeterType)
{
case MeterTypeEnum.Ammeter:
break;
case MeterTypeEnum.WaterMeter:
break;
}
}
//tempResult = await SendArchivalDataIssued(focusCacheInfo);
}
break;
case ServiceCommunicationTypeEnum.SetItemCodeTask:
if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value)){
SetItemCodeTaskInput? input = tempFirstKeyInfo.Value!.Deserialize<SetItemCodeTaskInput>();
if(input != null)
{
await AutoCollectionItemsSetAsync(input);
}
}
break;
default:
throw new Exception("暂不支持该数据通讯通道消息消费订阅解析");
}
return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail();
}
protected async Task<bool> SendArchivalDataIssued(FocusCacheInfos focusCacheInfo)
{
try
{
var checkResult = _tcpService.ClientExists(focusCacheInfo.FocusAddress);
if (checkResult)
{
string issuedMessageHexString = "";
await _tcpService.SendAsync(focusCacheInfo.FocusAddress, Convert.FromHexString(issuedMessageHexString));
return true;
}
else
{
return false;
}
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 自动采集项目设置
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
protected async Task<bool> AutoCollectionItemsSetAsync(SetItemCodeTaskInput input)
{
try
{
var key = $"{RedisConst.CacheAllDeviceInfoHashKey}";
List<DeviceCacheInfo> deviceCacheInfos = await _freeRedisProvider.Instance.HGetAsync<List<DeviceCacheInfo>>(key, input.FocusAddress);
if (deviceCacheInfos == null || deviceCacheInfos.Count <= 0)
{
throw new Exception($"未能在缓存中找到设备{input.FocusAddress}缓存key:{key}");
}
DeviceCacheInfo? deviceCacheInfo = deviceCacheInfos.FirstOrDefault(d => d.MeterAddress == input.MeterAddress);
if (deviceCacheInfo == null)
{
throw new Exception($"未能在缓存中找到设备{input.FocusAddress}下的表{input.MeterAddress}缓存key:{key}");
}
var details = deviceCacheInfo.ItemCodes.Where(n => n.Trim().Substring(0, 3) == "0D_").Select(it => new PnFn(1, Convert.ToInt32(it.Split('_')[1]))).ToList();
var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(input.FocusAddress, 1, 0, input.Cycle, DateTime.Now, 1, details);
string frame = bytes.ToHexString();
// TODO: 需要插入iotdb 日志
var checkResult = _tcpService.ClientExists(input.FocusAddress);
if (checkResult)
{
await _tcpService.SendAsync(input.FocusAddress, bytes);
return true;
}
else
{
return false;
}
}
catch (Exception)
{
throw;
}
}
}
}