diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 5313833..32df748 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -7,16 +7,20 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Text; +using System.Text.RegularExpressions; namespace JiShe.CollectBus.Kafka.Consumer { public class ConsumerService : IConsumerService, IDisposable { private readonly ILogger _logger; - private readonly ConcurrentDictionary + /// + /// 消费者存储 + /// Key 格式:{groupId}_{topic}_{TKey}_{TValue} + /// + private readonly ConcurrentDictionary _consumerStore = new(); private readonly KafkaOptionConfig _kafkaOptionConfig; - private class KafkaConsumer where TKey : notnull where TValue : class { } /// /// ConsumerService @@ -108,15 +112,14 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - //( - // CreateConsumer(groupId), - // cts - //)).Consumer as IConsumer; - var consumer = CreateConsumer(groupId); + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; consumer!.Subscribe(topics); await Task.Run(async () => @@ -176,19 +179,14 @@ namespace JiShe.CollectBus.Kafka.Consumer public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { try { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName)) - //{ - // string ssss = ""; - //} - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - //( - // CreateConsumer(groupId), - // cts - //)).Consumer as IConsumer; + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; - var consumer = CreateConsumer(groupId); consumer!.Subscribe(topics); _ = Task.Run(async () => @@ -274,15 +272,14 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次超时时间 public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - // ( - // CreateConsumer(groupId), - // cts - // )).Consumer as IConsumer; - var consumer = CreateConsumer(groupId); + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -366,6 +363,7 @@ namespace JiShe.CollectBus.Kafka.Consumer catch (OperationCanceledException) { // 任务取消,正常退出 + } catch (Exception ex) { @@ -407,16 +405,15 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费等待时间 public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class { - var consumerKey = typeof(KafkaConsumer); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); - //var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - // ( - // CreateConsumer(groupId), - // cts - // )).Consumer as IConsumer; + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; - var consumer= CreateConsumer (groupId); consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -518,9 +515,9 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// /// - public void Unsubscribe() where TKey : notnull where TValue : class + public void Unsubscribe(string[] topics, string groupId) where TKey : notnull where TValue : class { - var consumerKey = typeof((TKey, TValue)); + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; if (_consumerStore.TryRemove(consumerKey, out var entry)) { entry.CTS.Cancel(); diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs index 32ade01..b4b4274 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs @@ -46,5 +46,5 @@ public interface IConsumerService string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; - void Unsubscribe() where TKey : notnull where TValue : class; + void Unsubscribe(string[] topics, string groupId) where TKey : notnull where TValue : class; } \ No newline at end of file diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs index 8fe8f70..628934a 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs @@ -9,11 +9,9 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Protocol.Contracts { - public class AnalysisStrategyContext + public class AnalysisStrategyContext(IServiceProvider provider) { - private readonly IServiceProvider _provider; - - public AnalysisStrategyContext(IServiceProvider provider) => _provider = provider; + private readonly IServiceProvider _provider = provider; /// /// 执行策略 diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs index d6ab85a..ede07c1 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs @@ -8,6 +8,6 @@ namespace JiShe.CollectBus.Protocol.Dto { public class AFN0_F1_AnalysisDto: UnitDataDto { - + public bool Verify { get; set; } = true; } } diff --git a/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs index f5ba2af..3ff0882 100644 --- a/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs +++ b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs @@ -12,6 +12,9 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H { + /// + /// 全部确认:对收到报文中的全部数据单元标识进行确认 + /// public class AFN0_F1_Analysis: IAnalysisStrategy { private readonly ILogger _logger; diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 6a5e3a5..27f7466 100644 --- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -28,16 +28,18 @@ namespace JiShe.CollectBus.Protocol private readonly IProducerService _producerService; private readonly IRepository _deviceRepository; + private readonly ITcpService _tcpService; /// /// Initializes a new instance of the class. /// /// The service provider. - public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger logger) : base(serviceProvider, logger) + public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger logger, ITcpService tcpService) : base(serviceProvider, logger) { - _logger= logger; + _logger = logger; //_logger = serviceProvider.GetRequiredService>(); _producerService = serviceProvider.GetRequiredService(); _deviceRepository = serviceProvider.GetRequiredService>(); + _tcpService = tcpService; } public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); @@ -146,9 +148,21 @@ namespace JiShe.CollectBus.Protocol Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); + var issuedEventMessage = new IssuedEventMessage + { + ClientId = messageReceivedLoginEvent.ClientId, + DeviceNo = messageReceivedLoginEvent.DeviceNo, + Message = bytes, Type = IssuedEventType.Login, + MessageId = messageReceivedLoginEvent.MessageId + }; //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + if (_tcpService.ClientExists(issuedEventMessage.ClientId)) + { + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}"); + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage); + } - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedLoginEvent.ClientId, DeviceNo = messageReceivedLoginEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedLoginEvent.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); } @@ -226,11 +240,24 @@ namespace JiShe.CollectBus.Protocol }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); - - await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedHeartbeatEvent.ClientId, DeviceNo = messageReceivedHeartbeatEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceivedHeartbeatEvent.MessageId }); + + IssuedEventMessage issuedEventMessage = new IssuedEventMessage + { + ClientId = messageReceivedHeartbeatEvent.ClientId, + DeviceNo = messageReceivedHeartbeatEvent.DeviceNo, + Message = bytes, + Type = IssuedEventType.Heartbeat, + MessageId = messageReceivedHeartbeatEvent.MessageId + }; + if (_tcpService.ClientExists(issuedEventMessage.ClientId)) + { + await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes); + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}"); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage); + } //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); - + } diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 86582af..d871ecd 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -168,7 +168,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS { SystemName = "energy", DeviceId = "402440506", - DeviceType = "Ammeter", + DeviceType = "1", ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index cebb2e7..32a9fc3 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -75,14 +75,11 @@ namespace JiShe.CollectBus.Subscribers isAck=false; break; } - _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); - + loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - if (_tcpService.ClientExists(issuedEventMessage.ClientId)) - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); - isAck = true; + isAck = true; } // TODO:暂时ACK,等后续处理是否放到私信队列中 @@ -102,19 +99,10 @@ namespace JiShe.CollectBus.Subscribers isAck = false; break; } - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); - heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - //if (device != null) - //{ - // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); - //} - if(_tcpService.ClientExists(issuedEventMessage.ClientId)) - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); - } + } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } @@ -178,19 +166,6 @@ namespace JiShe.CollectBus.Subscribers //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { - //foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) - //{ - // var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - // if (protocolPlugin == null) - // { - // _logger.LogError("协议不存在!"); - // } - // else - // { - // //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); - // await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); - // } - //} await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); return SubscribeAck.Success(); @@ -200,20 +175,6 @@ namespace JiShe.CollectBus.Subscribers //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(List receivedLoginMessages) { - //foreach (var receivedLoginMessage in receivedLoginMessages) - //{ - //var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - //if (protocolPlugin == null) - //{ - // _logger.LogError("协议不存在!"); - //} - //else - //{ - // //await protocolPlugin.LoginAsync(receivedLoginMessage); - // await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); - //} - - //} await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages); return SubscribeAck.Success(); } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 0193df3..88209c2 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,7 +16,6 @@ 后端服务 -