diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 82672f6..4428fe4 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -148,7 +148,7 @@ namespace JiShe.CollectBus.Subscribers #region 水表消息采集 /// - /// 一分钟定时抄读任务消息消费订阅 + /// 1分钟采集水表数据下行消息消费订阅 /// /// /// @@ -157,11 +157,11 @@ namespace JiShe.CollectBus.Subscribers [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)] public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { - _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); + _logger.LogInformation("1分钟采集水表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { - _logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); + _logger.LogError("【1分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); } else { @@ -175,20 +175,20 @@ namespace JiShe.CollectBus.Subscribers } /// - /// 5分钟采集电表数据下行消息消费订阅 + /// 5分钟采集水表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("watermeter/fiveminute/issued-event")] - [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] + [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName)] public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { - _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); + _logger.LogInformation("5分钟采集水表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { - _logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); + _logger.LogError("【5分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); } else { @@ -202,20 +202,20 @@ namespace JiShe.CollectBus.Subscribers } /// - /// 15分钟采集电表数据下行消息消费订阅 + /// 15分钟采集水表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("watermeter/fifteenminute/issued-event")] - [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] + [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { - _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); + _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { - _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); + _logger.LogError("【15分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); } else { diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 32c39b5..96e1e18 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -256,10 +256,7 @@ namespace JiShe.CollectBus.Host /// The context. /// The configuration. public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration) - { - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); - topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - + { context.Services.AddCap(x => { x.DefaultGroupName = ProtocolConst.SubscriberGroup; @@ -287,40 +284,7 @@ namespace JiShe.CollectBus.Host /// Configures the mass transit. /// public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) - { - - - var adminClient = new AdminClientBuilder(new AdminClientConfig - { - BootstrapServers = configuration.GetConnectionString("Kafka") - }).Build(); - - try - { - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); - topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - - List topicSpecifications = new List(); - foreach (var item in topics) - { - topicSpecifications.Add(new TopicSpecification - { - Name = item, - NumPartitions = 3, - ReplicationFactor = 1 - }); - } - adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult(); - } - catch (CreateTopicsException e) - { - if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) - { - throw; - } - } - - + { var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup }; var producerConfig = new ProducerConfig(); @@ -391,5 +355,46 @@ namespace JiShe.CollectBus.Host }); }); } + + /// + /// 配置Kafka主题 + /// + /// + /// + public void ConfigureKafkaTopic(ServiceConfigurationContext context, IConfiguration configuration) + { + var adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = configuration.GetConnectionString("Kafka") + }).Build(); + + try + { + string serverTagName = configuration.GetSection("ServerTagName").Value!; + + List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName)); + + List topicSpecifications = new List(); + foreach (var item in topics) + { + topicSpecifications.Add(new TopicSpecification + { + Name = item, + NumPartitions = 3, + ReplicationFactor = 1 + }); + } + adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult(); + } + catch (CreateTopicsException e) + { + if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) + { + throw; + } + } + + } } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 6f24b1e..f18fcdb 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -45,6 +45,7 @@ namespace JiShe.CollectBus.Host ConfigureHangfire(context); ConfigureCap(context, configuration); //ConfigureMassTransit(context, configuration); + ConfigureKafkaTopic(context, configuration); ConfigureAuditLog(context); ConfigureCustom(context, configuration); } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 9694e44..2ef7ded 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -81,5 +81,6 @@ "DataBaseName": "energy", "OpenDebugMode": true, "UseTableSessionPoolByDefault": false - } + }, + "ServerTagName": "JiSheCollectBus" } \ No newline at end of file diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs index 8c097e2..67abc75 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs @@ -15,24 +15,27 @@ namespace JiShe.CollectBus.Protocol.Contracts /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) /// - public static List GetAllTopicNamesByIssued() + public static List GetAllTopicNamesByIssued(string serverTagName) { - return typeof(ProtocolConst) + List topics = typeof(ProtocolConst) .GetFields(BindingFlags.Public | BindingFlags.Static) .Where(f => f.IsLiteral && !f.IsInitOnly && f.FieldType == typeof(string) && f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段 + //.Select(f => $"{serverTagName}.{(string)f.GetRawConstantValue()!}") .Select(f => (string)f.GetRawConstantValue()!) .ToList(); + + return topics; } /// /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) /// - public static List GetAllTopicNamesByReceived() + public static List GetAllTopicNamesByReceived(string serverTagName) { //固定的上报主题 var topicList = typeof(ProtocolConst) @@ -52,6 +55,8 @@ namespace JiShe.CollectBus.Protocol.Contracts topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0'))); } + //return topicList.Select(f => $"{serverTagName}.{f}").ToList(); + return topicList; } }