diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs index 2a60803..1ca731b 100644 --- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs +++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs @@ -90,7 +90,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; result.Data.ValidData = true; @@ -136,7 +136,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } @@ -186,7 +186,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; result.Data.ValidData = true; @@ -224,7 +224,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } result.Status = true; @@ -316,7 +316,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); if (isManual) { @@ -384,7 +384,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; @@ -420,7 +420,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; @@ -456,7 +456,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; @@ -491,7 +491,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; @@ -527,7 +527,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; return result; @@ -585,7 +585,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } result.Status = true; @@ -662,7 +662,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } result.Status = true; result.Msg = "操作成功"; @@ -699,7 +699,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } result.Status = true; result.Msg = "操作成功"; @@ -735,7 +735,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } result.Status = true; result.Msg = "操作成功"; @@ -769,7 +769,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); result.Status = true; result.Msg = "操作成功"; return result; @@ -804,7 +804,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } result.Status = true; result.Msg = "操作成功"; @@ -867,7 +867,7 @@ namespace JiShe.CollectBus.EnergySystem Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } result.Status = true; diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 50b41b0..6cbc8c4 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -178,7 +178,7 @@ namespace JiShe.CollectBus.Plugins //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize()); + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); //await _producerBus.Publish( messageReceivedLoginEvent); } @@ -227,7 +227,7 @@ namespace JiShe.CollectBus.Plugins }; //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); - await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize()); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); //await _producerBus.Publish(messageReceivedHeartbeatEvent); } @@ -271,7 +271,7 @@ namespace JiShe.CollectBus.Plugins MessageHexString = messageHexString, DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() - }.Serialize()); + }); } } } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 8fa4f5e..d830707 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -382,7 +382,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize()); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); //_= _producerBus.Publish(tempMsg); @@ -448,7 +448,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize()); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); @@ -514,7 +514,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize()); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); @@ -809,7 +809,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); - await _producerService.ProduceAsync(topicName, partition, taskRecord.Serialize()); + await _producerService.ProduceAsync(topicName, partition, taskRecord); } private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) @@ -851,7 +851,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize()); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); diff --git a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs index 32f652e..df75b89 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs @@ -26,20 +26,51 @@ namespace JiShe.CollectBus.Kafka.Attributes /// /// 任务数(默认是多少个分区多少个任务) + /// 如设置订阅指定Partition则任务数始终为1 /// public int TaskCount { get; set; } = -1; - public KafkaSubscribeAttribute(string topic, string groupId = "default") + /// + /// 批量处理数量 + /// + public int BatchSize { get; set; } = 100; + + /// + /// 是否启用批量处理 + /// + public bool EnableBatch { get; set; } = false; + + /// + /// 批次超时时间 + /// + public TimeSpan? BatchTimeout { get; set; }=null; + + /// + /// 订阅主题 + /// + /// batchTimeout格式:("00:05:00") + public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null) { this.Topic = topic; this.GroupId = groupId; + this.EnableBatch = enableBatch; + this.BatchSize = batchSize; + this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null; } - public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default") + /// + /// 订阅主题 + /// + /// batchTimeout格式:("00:05:00") + public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null) { - this.Topic = topic ; + this.Topic = topic; this.Partition = partition; this.GroupId = groupId; + this.TaskCount = 1; + this.EnableBatch = enableBatch; + this.BatchSize = batchSize; + this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null; } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 9a2142e..5547919 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -10,6 +10,8 @@ using System.Collections.Concurrent; using System.Text.RegularExpressions; using NUglify.Html; using Serilog; +using System; +using System.Text; namespace JiShe.CollectBus.Kafka.Consumer { @@ -19,6 +21,7 @@ namespace JiShe.CollectBus.Kafka.Consumer private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _consumerStore = new(); + private class KafkaConsumer where TKey : notnull where TValue : class { } public ConsumerService(IConfiguration configuration, ILogger logger) { @@ -38,6 +41,7 @@ namespace JiShe.CollectBus.Kafka.Consumer { var config = BuildConsumerConfig(groupId); return new ConsumerBuilder(config) + .SetValueDeserializer(new JsonSerializer()) .SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}")) .SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}")) .Build(); @@ -54,7 +58,8 @@ namespace JiShe.CollectBus.Kafka.Consumer AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 - AllowAutoCreateTopics= true // 启用自动创建 + AllowAutoCreateTopics= true, // 启用自动创建 + FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; if (enableAuth) @@ -105,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { - var consumerKey = typeof((TKey, TValue)); + var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => @@ -123,15 +128,28 @@ namespace JiShe.CollectBus.Kafka.Consumer try { var result = consumer.Consume(cts.Token); - if (result == null) continue; - if (result.Message.Value == null) continue; + if (result == null || result.Message==null || result.Message.Value == null) + { + _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); + consumer.Commit(result); // 手动提交 + continue; + } if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); + continue; + } + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); + + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 continue; } - bool sucess= await messageHandler(result.Message.Key, result.Message.Value); if (sucess) { @@ -159,7 +177,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { - var consumerKey = typeof((Ignore, TValue)); + var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _=> @@ -177,15 +195,27 @@ namespace JiShe.CollectBus.Kafka.Consumer try { var result = consumer.Consume(cts.Token); - if (result == null) continue; - if (result.Message == null) continue; + if (result == null || result.Message==null || result.Message.Value == null) + { + _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); + consumer.Commit(result); // 手动提交 + continue; + } if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1)); + await Task.Delay(100, cts.Token); + continue; + } + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header值未匹配", result.Topic, result.Partition); + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 continue; } - bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 @@ -199,6 +229,267 @@ namespace JiShe.CollectBus.Kafka.Consumer await Task.CompletedTask; } + + /// + /// 批量订阅消息 + /// + /// 消息Key类型 + /// 消息Value类型 + /// 主题 + /// 批量消息处理函数 + /// 消费组ID + /// 批次大小 + /// 批次超时时间 + public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + { + await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); + } + + /// + /// 批量订阅消息 + /// + /// 消息Key类型 + /// 消息Value类型 + /// 主题列表 + /// 批量消息处理函数 + /// 消费组ID + /// 批次大小 + /// 批次超时时间 + public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + { + var consumerKey = typeof(KafkaConsumer); + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => + { + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + var startTime = DateTime.UtcNow; + + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) + { + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) + { + if (result.IsPartitionEOF) + { + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + } + else if (result.Message.Value != null) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + //messages.Add(result.Message.Value); + } + } + else + { + // 无消息时短暂等待 + await Task.Delay(10, cts.Token); + } + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value)); + if (success) + { + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) + { + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) + { + offsetsByPartition[tp] = offset; + } + } + + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) + { + _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + } + catch (OperationCanceledException) + { + // 任务取消,正常退出 + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); + + await Task.CompletedTask; + } + + + /// + /// 批量订阅消息 + /// + /// 消息Value类型 + /// 主题列表 + /// 批量消息处理函数 + /// 消费组ID + /// 批次大小 + /// 批次超时时间 + /// 消费等待时间 + public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class + { + await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); + + } + + + /// + /// 批量订阅消息 + /// + /// 消息Value类型 + /// 主题列表 + /// 批量消息处理函数 + /// 消费组ID + /// 批次大小 + /// 批次超时时间 + /// 消费等待时间 + public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class + { + var consumerKey = typeof(KafkaConsumer); + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => + { + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + //var messages = new List>(); + var startTime = DateTime.UtcNow; + + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) + { + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) + { + if (result.IsPartitionEOF) + { + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + } + else if (result.Message.Value != null) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + //messages.Add(result.Message.Value); + } + } + else + { + // 无消息时短暂等待 + await Task.Delay(10, cts.Token); + } + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value)); + if (success) + { + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) + { + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) + { + offsetsByPartition[tp] = offset; + } + } + + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) + { + _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + } + catch (OperationCanceledException) + { + // 任务取消,正常退出 + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); + + await Task.CompletedTask; + } + + /// /// 取消消息订阅 /// diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs index 5cfae2c..3925014 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs @@ -1,4 +1,5 @@ -using System; +using Confluent.Kafka; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -32,6 +33,11 @@ namespace JiShe.CollectBus.Kafka.Consumer /// Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class; + + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + void Unsubscribe() where TKey : notnull where TValue : class; } } diff --git a/src/JiShe.CollectBus.KafkaProducer/HeadersFilter.cs b/src/JiShe.CollectBus.KafkaProducer/HeadersFilter.cs new file mode 100644 index 0000000..0790f9f --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/HeadersFilter.cs @@ -0,0 +1,30 @@ +using Confluent.Kafka; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + /// + /// 消息头过滤器 + /// + public class HeadersFilter : Dictionary + { + /// + /// 判断Headers是否匹配 + /// + /// + /// + public bool Match(Headers headers) + { + foreach (var kvp in this) + { + if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value)) + return false; + } + return true; + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/JsonSerializer.cs b/src/JiShe.CollectBus.KafkaProducer/JsonSerializer.cs new file mode 100644 index 0000000..83f58a3 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/JsonSerializer.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Text.Json; +using Confluent.Kafka; +using System.Text.Json.Serialization; +using System.Text.Encodings.Web; + +namespace JiShe.CollectBus.Kafka +{ + /// + /// JSON 序列化器(支持泛型) + /// + public class JsonSerializer : ISerializer, IDeserializer + { + private static readonly JsonSerializerOptions _options = new JsonSerializerOptions + { + DefaultIgnoreCondition = JsonIgnoreCondition.Never, + WriteIndented = false,// 设置格式化输出 + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符 + IgnoreReadOnlyFields = true, + IgnoreReadOnlyProperties = true, + NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串 + AllowTrailingCommas = true, // 忽略尾随逗号 + ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 + PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 + Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, + }; + + public byte[] Serialize(T data, SerializationContext context) + { + if (data == null) + return null; + + try + { + return JsonSerializer.SerializeToUtf8Bytes(data, _options); + } + catch (Exception ex) + { + throw new InvalidOperationException("Kafka序列化失败", ex); + } + } + + public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) + { + if (isNull) + return default; + + try + { + return JsonSerializer.Deserialize(data, _options); + } + catch (Exception ex) + { + throw new InvalidOperationException("Kafka反序列化失败", ex); + } + } + } + + + public class DateTimeJsonConverter : JsonConverter + { + private readonly string _dateFormatString; + public DateTimeJsonConverter() + { + _dateFormatString = "yyyy-MM-dd HH:mm:ss"; + } + + public DateTimeJsonConverter(string dateFormatString) + { + _dateFormatString = dateFormatString; + } + + public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + return DateTime.Parse(reader.GetString()); + } + + public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options) + { + writer.WriteStringValue(value.ToString(_dateFormatString)); + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index cfe5eee..8860061 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -102,7 +102,7 @@ namespace JiShe.CollectBus.Kafka { var consumerService = provider.GetRequiredService(); - await consumerService.SubscribeAsync(attr.Topic, async (message) => + await consumerService.SubscribeAsync(attr.Topic, async (message) => { try { @@ -126,21 +126,21 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe) + private static async Task ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); bool isGenericTask = method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); bool existParameters = parameters.Length > 0; - dynamic? messageObj= null; - if (existParameters) - { - var paramType = parameters[0].ParameterType; - messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType); - } + //dynamic? messageObj= null; + //if (existParameters) + //{ + // var paramType = parameters[0].ParameterType; + // messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType); + //} if (isGenericTask) { - object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { messageObj }:null)!; + object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { message } :null)!; if (result is ISubscribeAck ackResult) { return ackResult.Ack; @@ -148,7 +148,7 @@ namespace JiShe.CollectBus.Kafka } else { - object? result = method.Invoke(subscribe, existParameters ? new[] { messageObj } : null); + object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null); if (result is ISubscribeAck ackResult) { return ackResult.Ack; diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index 8c069a9..0ddf36b 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -17,7 +17,8 @@ namespace JiShe.CollectBus.Kafka.Producer { private readonly ILogger _logger; private readonly IConfiguration _configuration; - private readonly ConcurrentDictionary, object> _producerCache = new(); + private readonly ConcurrentDictionary _producerCache = new(); + private class KafkaProducer where TKey : notnull where TValue : class { } public ProducerService(IConfiguration configuration,ILogger logger) { @@ -32,14 +33,13 @@ namespace JiShe.CollectBus.Kafka.Producer /// /// /// - private IProducer GetProducer() + private IProducer GetProducer(Type typeKey) { - var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!; - return (IProducer)_producerCache.GetOrAdd(typeKey, _ => { var config = BuildProducerConfig(); return new ProducerBuilder(config) + .SetValueSerializer(new JsonSerializer()) // Value 使用自定义 JSON 序列化 .SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message)) .Build(); }); @@ -103,8 +103,17 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic, TKey key, TValue value)where TKey : notnull where TValue : class { - var producer = GetProducer(); - await producer.ProduceAsync(topic, new Message { Key = key, Value = value }); + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); + var message = new Message + { + Key = key, + Value = value, + Headers = new Headers{ + { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + } + }; + await producer.ProduceAsync(topic, message); } /// @@ -116,8 +125,16 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic, TValue value) where TValue : class { - var producer = GetProducer(); - await producer.ProduceAsync(topic, new Message { Value = value }); + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); + var message = new Message + { + Value = value, + Headers = new Headers{ + { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + } + }; + await producer.ProduceAsync(topic, message); } /// @@ -136,9 +153,13 @@ namespace JiShe.CollectBus.Kafka.Producer var message = new Message { Key = key, - Value = value + Value = value, + Headers = new Headers{ + { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + } }; - var producer = GetProducer(); + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); if (partition.HasValue) { var topicPartition = new TopicPartition(topic, partition.Value); @@ -166,9 +187,13 @@ namespace JiShe.CollectBus.Kafka.Producer { var message = new Message { - Value = value + Value = value, + Headers = new Headers{ + { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + } }; - var producer = GetProducer(); + var typeKey = typeof(KafkaProducer); + var producer = GetProducer(typeKey); if (partition.HasValue) { var topicPartition = new TopicPartition(topic, partition.Value); diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 963d89b..c43fa70 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -92,7 +92,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize()); + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); } @@ -133,7 +133,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); - await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize()); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); }