2025-06-04 09:16:07 +08:00

528 lines
24 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Protocol;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Subscribers
{
public class SubscriberAnalysisAppService : CollectBusAppService, ISubscriberAnalysisAppService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAnalysisAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IoTDBSessionPoolProvider _dbProvider;
private readonly IProtocolService _protocolService;
public SubscriberAnalysisAppService(ILogger<SubscriberAnalysisAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IoTDBSessionPoolProvider dbProvider,
IProtocolService protocolService)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_dbProvider = dbProvider;
_protocolService = protocolService;
}
/// <summary>
/// 解析AFN00H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN00HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN00Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN01H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN01HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN01Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN02H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN02HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN02Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data, (result) =>
{
var ssss = (UnitDataAnalysis<AnalysisBaseDto<string>>)result;
_logger.LogInformation($"解析AFN02H数据{ssss.Serialize()}");
});
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN03H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN03HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN03Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN04H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN04HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN04Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN05H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN05HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN05Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN09H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN09HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN09Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN0AH
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN0AHReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN0AEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN0BH
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN0BHReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN0BEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN0CH
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN0CHReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN0CEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN0DH
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN0DHReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN0DEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN0EH
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN0EHReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN0EEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
/// <summary>
/// 解析AFN0HH
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.SubscriberAFN10HReceivedEventName)]
public async Task<ISubscribeAck> ReceivedAFN0HEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761>(serverName, receivedMessage.Data);
return SubscribeAck.Success();
}
return SubscribeAck.Fail();
}
}
}