调整结构

This commit is contained in:
zenghongyao 2025-04-25 09:28:56 +08:00
parent 264eec540a
commit 16dab4ba29
9 changed files with 176 additions and 150 deletions

View File

@ -12,6 +12,7 @@ using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using Volo.Abp; using Volo.Abp;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Protocol.Contracts.Services namespace JiShe.CollectBus.Protocol.Contracts.Services
{ {
@ -19,11 +20,13 @@ namespace JiShe.CollectBus.Protocol.Contracts.Services
{ {
private readonly IFreeRedisProvider _freeRedisProvider; private readonly IFreeRedisProvider _freeRedisProvider;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly ILogger<ProtocolService> _logger;
public ProtocolService(IFreeRedisProvider freeRedisProvider, IServiceProvider serviceProvider) public ProtocolService(IFreeRedisProvider freeRedisProvider, IServiceProvider serviceProvider, ILogger<ProtocolService> logger)
{ {
_freeRedisProvider = freeRedisProvider; _freeRedisProvider = freeRedisProvider;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_logger= logger;
} }
/// <summary> /// <summary>
@ -61,6 +64,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Services
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "获取协议失败");
return null; return null;
} }
} }

View File

@ -1,15 +1,7 @@
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Dto; using JiShe.CollectBus.Protocol.Dto;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.IdentityModel.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H
{ {
@ -32,16 +24,17 @@ namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H
ArgumentNullException.ThrowIfNull(nameof(tB3761)); ArgumentNullException.ThrowIfNull(nameof(tB3761));
UnitDataAnalysis<bool> dto = new UnitDataAnalysis<bool> UnitDataAnalysis<bool> dto = new UnitDataAnalysis<bool>
{ {
Code = tB3761.A?.Code, Code = tB3761.A.Code,
AFN = tB3761.AFN_FC?.AFN ?? 0, AFN = tB3761.AFN_FC.AFN,
Fn = tB3761.DT?.Fn ?? 0, Fn = tB3761.DT.Fn,
Pn = tB3761.DA?.Pn ?? 0 Pn = tB3761.DA.Pn,
Data= true
}; };
return Task.FromResult(dto); return Task.FromResult(dto);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, $"00_1解析失败:{tB3761.A?.Code}-{tB3761.DT?.Fn ?? 0}-{tB3761?.BaseHexMessage?.HexMessageString},{ex.Message}"); _logger.LogError(ex, $"00_1解析失败:{tB3761.A.Code}-{tB3761.DT.Fn}-{tB3761.BaseHexMessage.HexMessageString},{ex.Message}");
return null; return null;
} }
} }

View File

@ -79,8 +79,6 @@ namespace JiShe.CollectBus.Protocol
// 注册策略实现 // 注册策略实现
services.AddTransient(analysisStrategyType); services.AddTransient(analysisStrategyType);
strategyMetadata[(strategyType, inputType, resultType)] = analysisStrategyType; strategyMetadata[(strategyType, inputType, resultType)] = analysisStrategyType;
} }
} }

View File

@ -73,13 +73,68 @@ namespace JiShe.CollectBus.Protocol
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
} }
} }
}
}
await OnTcpNormalReceived(client, tB3761);
} }
return (tB3761 as T)!; return (tB3761 as T)!;
} }
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param>
/// <param name="tB3761"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, TB3761 tB3761)
{
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
if (tB3761.AFN_FC.BaseHexMessage == null || tB3761.DT.BaseHexMessage == null || tB3761.BaseHexMessage.HexMessageString==null)
{
_logger.LogError("376.1协议解析AFN失败");
return;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
if (tB3761.DT.Fn == (int)FN. || tB3761.DT.Fn == (int)FN.)
return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761.AFN_FC.AFN.ToString().PadLeft(2, '0'));
MessageProtocolAnalysis<TB3761> messageReceivedAnalysis = new MessageProtocolAnalysis<TB3761>()
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = tB3761.BaseHexMessage.HexMessageString!,
DeviceNo = tB3761.A.Code!,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
Data = tB3761
};
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if (topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, messageReceivedAnalysis);
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
}
}
/// <summary> /// <summary>
/// 登录回复 /// 登录回复
@ -119,18 +174,10 @@ namespace JiShe.CollectBus.Protocol
ClientPort = client.Port, ClientPort = client.Port,
MessageHexString = messageReceived, MessageHexString = messageReceived,
DeviceNo = code, DeviceNo = code,
MessageId = Guid.NewGuid().ToString() MessageId = Guid.NewGuid().ToString(),
ReceivedTime=DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
}; };
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
//await _producerBus.Publish( messageReceivedLoginEvent);
//var aTuple = (Tuple<string, int>)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.A);
//var seq = (Seq)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.SEQ);
var reqParam = new ReqParameter2 var reqParam = new ReqParameter2
{ {
AFN = AFN., AFN = AFN.,
@ -156,15 +203,12 @@ namespace JiShe.CollectBus.Protocol
Message = bytes, Type = IssuedEventType.Login, Message = bytes, Type = IssuedEventType.Login,
MessageId = messageReceivedLoginEvent.MessageId MessageId = messageReceivedLoginEvent.MessageId
}; };
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
if (_tcpService.ClientExists(issuedEventMessage.ClientId)) if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{ {
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}"); _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage); await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
} }
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
} }
/// <summary> /// <summary>
@ -215,7 +259,8 @@ namespace JiShe.CollectBus.Protocol
ClientPort = client.Port, ClientPort = client.Port,
MessageHexString = messageReceived, MessageHexString = messageReceived,
DeviceNo = code, DeviceNo = code,
MessageId = Guid.NewGuid().ToString() MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
}; };
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
@ -254,11 +299,6 @@ namespace JiShe.CollectBus.Protocol
} }
} }
#region #region
//68 //68

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Subscribers
{
public interface ISubscriberAnalysisAppService
{
}
}

View File

@ -77,13 +77,10 @@ namespace JiShe.CollectBus.Plugins
TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync<TB3761>(tcpSessionClient, messageHexString); TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync<TB3761>(tcpSessionClient, messageHexString);
if (tB3761 == null) if (tB3761 == null)
{ {
// TODO: 暂时不处理,后续再处理
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
} }
else await e.InvokeNext();
{
await OnTcpNormalReceived(tcpSessionClient, messageHexString, tB3761);
}
await e.InvokeNext();
} }
//[GeneratorPlugin(typeof(ITcpConnectingPlugin))] //[GeneratorPlugin(typeof(ITcpConnectingPlugin))]
@ -124,76 +121,5 @@ namespace JiShe.CollectBus.Plugins
await e.InvokeNext(); await e.InvokeNext();
} }
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param>
/// <param name="tB3761"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761)
{
//await _producerBus.Publish(new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
if(tB3761?.AFN_FC.BaseHexMessage==null || tB3761.DT.BaseHexMessage == null)
{
_logger.LogError("376.1协议解析AFN失败");
return;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
//if(tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳)
// return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761?.AFN_FC.AFN.ToString().PadLeft(2,'0'));
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if(topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, new MessageReceived
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = messageHexString,
DeviceNo = tB3761?.A?.Code!,
MessageId = Guid.NewGuid().ToString()
});
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = messageHexString,
DeviceNo = tB3761?.A?.Code!,
MessageId = Guid.NewGuid().ToString()
});
}
}
} }
} }

View File

@ -0,0 +1,83 @@
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
public class SubscriberAnalysisAppService : CollectBusAppService, ISubscriberAnalysisAppService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAnalysisAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDbProvider _dbProvider;
private readonly IProtocolService _protocolService;
public SubscriberAnalysisAppService(ILogger<SubscriberAnalysisAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IIoTDbProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider;
_protocolService = protocolService;
}
/// <summary>
/// 解析AFN00H
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.SubscriberAFN00HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN00Event(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
if (receivedMessage.Data==null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
await executor.ExecuteAsync<TB3761, object>(serverName, receivedMessage.Data);
}
return SubscribeAck.Fail();
}
}
}

View File

@ -169,7 +169,6 @@ namespace JiShe.CollectBus.Subscribers
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages) public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
{ {
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
@ -180,40 +179,5 @@ namespace JiShe.CollectBus.Subscribers
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
//[KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN00Event(MessageReceived receivedMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
if (tB3761 == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (tB3761.DT == null || tB3761.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
string serverName = $"AFN{tB3761.AFN_FC.AFN}_F{tB3761.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<bool>>(tB3761);
var executor = _serviceProvider.GetRequiredService<AnalysisStrategyContext>();
bool isSucces= await executor.ExecuteAsync<TB3761, bool>("AFN0_F1_Analysis", tB3761);
}
return SubscribeAck.Fail();
}
} }
} }

View File

@ -50,4 +50,9 @@ namespace JiShe.CollectBus.IotSystems.MessageReceiveds
public DateTime? AckTime { get; set; } public DateTime? AckTime { get; set; }
} }
public class MessageProtocolAnalysis<T> : MessageReceived
{
public T Data { get; set; } = default!;
}
} }