dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
5 changed files with 65 additions and 53 deletions
Showing only changes of commit beb11c76f5 - Show all commits

View File

@ -148,7 +148,7 @@ namespace JiShe.CollectBus.Subscribers
#region #region
/// <summary> /// <summary>
/// 一分钟定时抄读任务消息消费订阅 /// 1分钟采集水表数据下行消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
@ -157,11 +157,11 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)] [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("1分钟采集表数据下行消息消费队列开始处理"); _logger.LogInformation("1分钟采集表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError("【1分钟采集表数据下行消息消费队列开始处理】协议不存在!"); _logger.LogError("【1分钟采集表数据下行消息消费队列开始处理】协议不存在!");
} }
else else
{ {
@ -175,20 +175,20 @@ namespace JiShe.CollectBus.Subscribers
} }
/// <summary> /// <summary>
/// 5分钟采集表数据下行消息消费订阅 /// 5分钟采集表数据下行消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("watermeter/fiveminute/issued-event")] [Route("watermeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("5分钟采集表数据下行消息消费队列开始处理"); _logger.LogInformation("5分钟采集表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError("【5分钟采集表数据下行消息消费队列开始处理】协议不存在!"); _logger.LogError("【5分钟采集表数据下行消息消费队列开始处理】协议不存在!");
} }
else else
{ {
@ -202,20 +202,20 @@ namespace JiShe.CollectBus.Subscribers
} }
/// <summary> /// <summary>
/// 15分钟采集表数据下行消息消费订阅 /// 15分钟采集表数据下行消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("watermeter/fifteenminute/issued-event")] [Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("15分钟采集表数据下行消息消费队列开始处理"); _logger.LogInformation("15分钟采集表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError("【15分钟采集表数据下行消息消费队列开始处理】协议不存在!"); _logger.LogError("【15分钟采集表数据下行消息消费队列开始处理】协议不存在!");
} }
else else
{ {

View File

@ -257,9 +257,6 @@ namespace JiShe.CollectBus.Host
/// <param name="configuration">The configuration.</param> /// <param name="configuration">The configuration.</param>
public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration) public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration)
{ {
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
context.Services.AddCap(x => context.Services.AddCap(x =>
{ {
x.DefaultGroupName = ProtocolConst.SubscriberGroup; x.DefaultGroupName = ProtocolConst.SubscriberGroup;
@ -288,39 +285,6 @@ namespace JiShe.CollectBus.Host
/// </summary> /// </summary>
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{ {
var adminClient = new AdminClientBuilder(new AdminClientConfig
{
BootstrapServers = configuration.GetConnectionString("Kafka")
}).Build();
try
{
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
foreach (var item in topics)
{
topicSpecifications.Add(new TopicSpecification
{
Name = item,
NumPartitions = 3,
ReplicationFactor = 1
});
}
adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
throw;
}
}
var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup }; var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
var producerConfig = new ProducerConfig(); var producerConfig = new ProducerConfig();
@ -391,5 +355,46 @@ namespace JiShe.CollectBus.Host
}); });
}); });
} }
/// <summary>
/// 配置Kafka主题
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
public void ConfigureKafkaTopic(ServiceConfigurationContext context, IConfiguration configuration)
{
var adminClient = new AdminClientBuilder(new AdminClientConfig
{
BootstrapServers = configuration.GetConnectionString("Kafka")
}).Build();
try
{
string serverTagName = configuration.GetSection("ServerTagName").Value!;
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName);
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
foreach (var item in topics)
{
topicSpecifications.Add(new TopicSpecification
{
Name = item,
NumPartitions = 3,
ReplicationFactor = 1
});
}
adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
throw;
}
}
}
} }
} }

View File

@ -45,6 +45,7 @@ namespace JiShe.CollectBus.Host
ConfigureHangfire(context); ConfigureHangfire(context);
ConfigureCap(context, configuration); ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration); //ConfigureMassTransit(context, configuration);
ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context); ConfigureAuditLog(context);
ConfigureCustom(context, configuration); ConfigureCustom(context, configuration);
} }

View File

@ -81,5 +81,6 @@
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
} },
"ServerTagName": "JiSheCollectBus"
} }

View File

@ -15,24 +15,27 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
/// </summary> /// </summary>
public static List<string> GetAllTopicNamesByIssued() public static List<string> GetAllTopicNamesByIssued(string serverTagName)
{ {
return typeof(ProtocolConst) List<string> topics = typeof(ProtocolConst)
.GetFields(BindingFlags.Public | BindingFlags.Static) .GetFields(BindingFlags.Public | BindingFlags.Static)
.Where(f => .Where(f =>
f.IsLiteral && f.IsLiteral &&
!f.IsInitOnly && !f.IsInitOnly &&
f.FieldType == typeof(string) && f.FieldType == typeof(string) &&
f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段 f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段
//.Select(f => $"{serverTagName}.{(string)f.GetRawConstantValue()!}")
.Select(f => (string)f.GetRawConstantValue()!) .Select(f => (string)f.GetRawConstantValue()!)
.ToList(); .ToList();
return topics;
} }
/// <summary> /// <summary>
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
/// </summary> /// </summary>
public static List<string> GetAllTopicNamesByReceived() public static List<string> GetAllTopicNamesByReceived(string serverTagName)
{ {
//固定的上报主题 //固定的上报主题
var topicList = typeof(ProtocolConst) var topicList = typeof(ProtocolConst)
@ -52,6 +55,8 @@ namespace JiShe.CollectBus.Protocol.Contracts
topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0'))); topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0')));
} }
//return topicList.Select(f => $"{serverTagName}.{f}").ToList();
return topicList; return topicList;
} }
} }