From eed68d0fe007b7a2d7558e76fb7df227f77742b2 Mon Sep 17 00:00:00 2001
From: zenghongyao <873884283@qq.com>
Date: Wed, 16 Apr 2025 18:26:25 +0800
Subject: [PATCH] =?UTF-8?q?kafka=E5=A2=9E=E5=8A=A0=E6=89=B9=E9=87=8F?=
=?UTF-8?q?=E6=B6=88=E8=B4=B9?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../EnergySystem/EnergySystemAppService.cs | 34 +-
.../Plugins/TcpMonitor.cs | 6 +-
.../BasicScheduledMeterReadingService.cs | 10 +-
.../Attributes/KafkaSubscribeAttribute.cs | 37 ++-
.../Consumer/ConsumerService.cs | 313 +++++++++++++++++-
.../Consumer/IConsumerService.cs | 8 +-
.../HeadersFilter.cs | 30 ++
.../JsonSerializer.cs | 88 +++++
.../KafkaSubcribesExtensions.cs | 20 +-
.../Producer/ProducerService.cs | 49 ++-
.../Abstracts/BaseProtocolPlugin.cs | 4 +-
11 files changed, 535 insertions(+), 64 deletions(-)
create mode 100644 src/JiShe.CollectBus.KafkaProducer/HeadersFilter.cs
create mode 100644 src/JiShe.CollectBus.KafkaProducer/JsonSerializer.cs
diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs
index 2a60803..1ca731b 100644
--- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs
+++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs
@@ -90,7 +90,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
result.Data.ValidData = true;
@@ -136,7 +136,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
@@ -186,7 +186,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
result.Data.ValidData = true;
@@ -224,7 +224,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
result.Status = true;
@@ -316,7 +316,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
if (isManual)
{
@@ -384,7 +384,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
@@ -420,7 +420,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
@@ -456,7 +456,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
@@ -491,7 +491,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
@@ -527,7 +527,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
return result;
@@ -585,7 +585,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
result.Status = true;
@@ -662,7 +662,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
result.Status = true;
result.Msg = "操作成功";
@@ -699,7 +699,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
result.Status = true;
result.Msg = "操作成功";
@@ -735,7 +735,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
result.Status = true;
result.Msg = "操作成功";
@@ -769,7 +769,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
result.Status = true;
result.Msg = "操作成功";
return result;
@@ -804,7 +804,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
result.Status = true;
result.Msg = "操作成功";
@@ -867,7 +867,7 @@ namespace JiShe.CollectBus.EnergySystem
Message = bytes,
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
result.Status = true;
diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
index 50b41b0..6cbc8c4 100644
--- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
@@ -178,7 +178,7 @@ namespace JiShe.CollectBus.Plugins
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
- await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize());
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
//await _producerBus.Publish( messageReceivedLoginEvent);
}
@@ -227,7 +227,7 @@ namespace JiShe.CollectBus.Plugins
};
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
- await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize());
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
@@ -271,7 +271,7 @@ namespace JiShe.CollectBus.Plugins
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
- }.Serialize());
+ });
}
}
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 8fa4f5e..d830707 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -382,7 +382,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize());
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//_= _producerBus.Publish(tempMsg);
@@ -448,7 +448,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize());
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@@ -514,7 +514,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@@ -809,7 +809,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
- await _producerService.ProduceAsync(topicName, partition, taskRecord.Serialize());
+ await _producerService.ProduceAsync(topicName, partition, taskRecord);
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
@@ -851,7 +851,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
diff --git a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs
index 32f652e..df75b89 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs
@@ -26,20 +26,51 @@ namespace JiShe.CollectBus.Kafka.Attributes
///
/// 任务数(默认是多少个分区多少个任务)
+ /// 如设置订阅指定Partition则任务数始终为1
///
public int TaskCount { get; set; } = -1;
- public KafkaSubscribeAttribute(string topic, string groupId = "default")
+ ///
+ /// 批量处理数量
+ ///
+ public int BatchSize { get; set; } = 100;
+
+ ///
+ /// 是否启用批量处理
+ ///
+ public bool EnableBatch { get; set; } = false;
+
+ ///
+ /// 批次超时时间
+ ///
+ public TimeSpan? BatchTimeout { get; set; }=null;
+
+ ///
+ /// 订阅主题
+ ///
+ /// batchTimeout格式:("00:05:00")
+ public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
{
this.Topic = topic;
this.GroupId = groupId;
+ this.EnableBatch = enableBatch;
+ this.BatchSize = batchSize;
+ this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
}
- public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default")
+ ///
+ /// 订阅主题
+ ///
+ /// batchTimeout格式:("00:05:00")
+ public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
{
- this.Topic = topic ;
+ this.Topic = topic;
this.Partition = partition;
this.GroupId = groupId;
+ this.TaskCount = 1;
+ this.EnableBatch = enableBatch;
+ this.BatchSize = batchSize;
+ this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
}
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
index 9a2142e..5547919 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
@@ -10,6 +10,8 @@ using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using NUglify.Html;
using Serilog;
+using System;
+using System.Text;
namespace JiShe.CollectBus.Kafka.Consumer
{
@@ -19,6 +21,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary
_consumerStore = new();
+ private class KafkaConsumer where TKey : notnull where TValue : class { }
public ConsumerService(IConfiguration configuration, ILogger logger)
{
@@ -38,6 +41,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
var config = BuildConsumerConfig(groupId);
return new ConsumerBuilder(config)
+ .SetValueDeserializer(new JsonSerializer())
.SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}"))
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
.Build();
@@ -54,7 +58,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
- AllowAutoCreateTopics= true // 启用自动创建
+ AllowAutoCreateTopics= true, // 启用自动创建
+ FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB)
};
if (enableAuth)
@@ -105,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
- var consumerKey = typeof((TKey, TValue));
+ var consumerKey = typeof(KafkaConsumer);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
@@ -123,15 +128,28 @@ namespace JiShe.CollectBus.Kafka.Consumer
try
{
var result = consumer.Consume(cts.Token);
- if (result == null) continue;
- if (result.Message.Value == null) continue;
+ if (result == null || result.Message==null || result.Message.Value == null)
+ {
+ _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
+ consumer.Commit(result); // 手动提交
+ continue;
+ }
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
- await Task.Delay(TimeSpan.FromSeconds(1));
+ await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
+ continue;
+ }
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition);
+
+ //consumer.Commit(result); // 提交偏移量
+ // 跳过消息
continue;
}
-
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
{
@@ -159,7 +177,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class
{
- var consumerKey = typeof((Ignore, TValue));
+ var consumerKey = typeof(KafkaConsumer);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
@@ -177,15 +195,27 @@ namespace JiShe.CollectBus.Kafka.Consumer
try
{
var result = consumer.Consume(cts.Token);
- if (result == null) continue;
- if (result.Message == null) continue;
+ if (result == null || result.Message==null || result.Message.Value == null)
+ {
+ _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
+ consumer.Commit(result); // 手动提交
+ continue;
+ }
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
- await Task.Delay(TimeSpan.FromSeconds(1));
+ await Task.Delay(100, cts.Token);
+ continue;
+ }
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header值未匹配", result.Topic, result.Partition);
+ //consumer.Commit(result); // 提交偏移量
+ // 跳过消息
continue;
}
-
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
@@ -199,6 +229,267 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.CompletedTask;
}
+
+ ///
+ /// 批量订阅消息
+ ///
+ /// 消息Key类型
+ /// 消息Value类型
+ /// 主题
+ /// 批量消息处理函数
+ /// 消费组ID
+ /// 批次大小
+ /// 批次超时时间
+ public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
+ {
+ await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
+ }
+
+ ///
+ /// 批量订阅消息
+ ///
+ /// 消息Key类型
+ /// 消息Value类型
+ /// 主题列表
+ /// 批量消息处理函数
+ /// 消费组ID
+ /// 批次大小
+ /// 批次超时时间
+ public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
+ {
+ var consumerKey = typeof(KafkaConsumer);
+ var cts = new CancellationTokenSource();
+
+ var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ (
+ CreateConsumer(groupId),
+ cts
+ )).Consumer as IConsumer;
+
+ consumer!.Subscribe(topics);
+
+ var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
+
+ _ = Task.Run(async () =>
+ {
+ var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
+ var startTime = DateTime.UtcNow;
+
+ while (!cts.IsCancellationRequested)
+ {
+ try
+ {
+ // 非阻塞快速累积消息
+ while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
+ {
+ var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
+
+ if (result != null)
+ {
+ if (result.IsPartitionEOF)
+ {
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+ await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
+ }
+ else if (result.Message.Value != null)
+ {
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition);
+ //consumer.Commit(result); // 提交偏移量
+ // 跳过消息
+ continue;
+ }
+ messages.Add((result.Message.Value, result.TopicPartitionOffset));
+ //messages.Add(result.Message.Value);
+ }
+ }
+ else
+ {
+ // 无消息时短暂等待
+ await Task.Delay(10, cts.Token);
+ }
+ }
+
+ // 处理批次
+ if (messages.Count > 0)
+ {
+ bool success = await messageBatchHandler(messages.Select(m => m.Value));
+ if (success)
+ {
+ var offsetsByPartition = new Dictionary();
+ foreach (var msg in messages)
+ {
+ var tp = msg.Offset.TopicPartition;
+ var offset = msg.Offset.Offset;
+ if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
+ {
+ offsetsByPartition[tp] = offset;
+ }
+ }
+
+ var offsetsToCommit = offsetsByPartition
+ .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
+ .ToList();
+ consumer.Commit(offsetsToCommit);
+ }
+ messages.Clear();
+ }
+
+ startTime = DateTime.UtcNow;
+ }
+ catch (ConsumeException ex)
+ {
+ _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
+ }
+ catch (OperationCanceledException)
+ {
+ // 任务取消,正常退出
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "处理批量消息时发生未知错误");
+ }
+ }
+ }, cts.Token);
+
+ await Task.CompletedTask;
+ }
+
+
+ ///
+ /// 批量订阅消息
+ ///
+ /// 消息Value类型
+ /// 主题列表
+ /// 批量消息处理函数
+ /// 消费组ID
+ /// 批次大小
+ /// 批次超时时间
+ /// 消费等待时间
+ public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
+ {
+ await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
+
+ }
+
+
+ ///
+ /// 批量订阅消息
+ ///
+ /// 消息Value类型
+ /// 主题列表
+ /// 批量消息处理函数
+ /// 消费组ID
+ /// 批次大小
+ /// 批次超时时间
+ /// 消费等待时间
+ public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
+ {
+ var consumerKey = typeof(KafkaConsumer);
+ var cts = new CancellationTokenSource();
+
+ var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ (
+ CreateConsumer(groupId),
+ cts
+ )).Consumer as IConsumer;
+
+ consumer!.Subscribe(topics);
+
+ var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
+
+ _ = Task.Run(async () =>
+ {
+ var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
+ //var messages = new List>();
+ var startTime = DateTime.UtcNow;
+
+ while (!cts.IsCancellationRequested)
+ {
+ try
+ {
+ // 非阻塞快速累积消息
+ while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
+ {
+ var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
+
+ if (result != null)
+ {
+ if (result.IsPartitionEOF)
+ {
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+ await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
+ }
+ else if (result.Message.Value != null)
+ {
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ //consumer.Commit(result); // 提交偏移量
+ // 跳过消息
+ continue;
+ }
+ messages.Add((result.Message.Value, result.TopicPartitionOffset));
+ //messages.Add(result.Message.Value);
+ }
+ }
+ else
+ {
+ // 无消息时短暂等待
+ await Task.Delay(10, cts.Token);
+ }
+ }
+
+ // 处理批次
+ if (messages.Count > 0)
+ {
+ bool success = await messageBatchHandler(messages.Select(m => m.Value));
+ if (success)
+ {
+ var offsetsByPartition = new Dictionary();
+ foreach (var msg in messages)
+ {
+ var tp = msg.Offset.TopicPartition;
+ var offset = msg.Offset.Offset;
+ if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
+ {
+ offsetsByPartition[tp] = offset;
+ }
+ }
+
+ var offsetsToCommit = offsetsByPartition
+ .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
+ .ToList();
+ consumer.Commit(offsetsToCommit);
+ }
+ messages.Clear();
+ }
+
+ startTime = DateTime.UtcNow;
+ }
+ catch (ConsumeException ex)
+ {
+ _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
+ }
+ catch (OperationCanceledException)
+ {
+ // 任务取消,正常退出
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "处理批量消息时发生未知错误");
+ }
+ }
+ }, cts.Token);
+
+ await Task.CompletedTask;
+ }
+
+
///
/// 取消消息订阅
///
diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
index 5cfae2c..3925014 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
@@ -1,4 +1,5 @@
-using System;
+using Confluent.Kafka;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -32,6 +33,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class;
+
+ Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
+
+ Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
+
void Unsubscribe() where TKey : notnull where TValue : class;
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/HeadersFilter.cs b/src/JiShe.CollectBus.KafkaProducer/HeadersFilter.cs
new file mode 100644
index 0000000..0790f9f
--- /dev/null
+++ b/src/JiShe.CollectBus.KafkaProducer/HeadersFilter.cs
@@ -0,0 +1,30 @@
+using Confluent.Kafka;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Kafka
+{
+ ///
+ /// 消息头过滤器
+ ///
+ public class HeadersFilter : Dictionary
+ {
+ ///
+ /// 判断Headers是否匹配
+ ///
+ ///
+ ///
+ public bool Match(Headers headers)
+ {
+ foreach (var kvp in this)
+ {
+ if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value))
+ return false;
+ }
+ return true;
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.KafkaProducer/JsonSerializer.cs b/src/JiShe.CollectBus.KafkaProducer/JsonSerializer.cs
new file mode 100644
index 0000000..83f58a3
--- /dev/null
+++ b/src/JiShe.CollectBus.KafkaProducer/JsonSerializer.cs
@@ -0,0 +1,88 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Text.Json;
+using Confluent.Kafka;
+using System.Text.Json.Serialization;
+using System.Text.Encodings.Web;
+
+namespace JiShe.CollectBus.Kafka
+{
+ ///
+ /// JSON 序列化器(支持泛型)
+ ///
+ public class JsonSerializer : ISerializer, IDeserializer
+ {
+ private static readonly JsonSerializerOptions _options = new JsonSerializerOptions
+ {
+ DefaultIgnoreCondition = JsonIgnoreCondition.Never,
+ WriteIndented = false,// 设置格式化输出
+ Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
+ IgnoreReadOnlyFields = true,
+ IgnoreReadOnlyProperties = true,
+ NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
+ AllowTrailingCommas = true, // 忽略尾随逗号
+ ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
+ PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
+ Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
+ };
+
+ public byte[] Serialize(T data, SerializationContext context)
+ {
+ if (data == null)
+ return null;
+
+ try
+ {
+ return JsonSerializer.SerializeToUtf8Bytes(data, _options);
+ }
+ catch (Exception ex)
+ {
+ throw new InvalidOperationException("Kafka序列化失败", ex);
+ }
+ }
+
+ public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context)
+ {
+ if (isNull)
+ return default;
+
+ try
+ {
+ return JsonSerializer.Deserialize(data, _options);
+ }
+ catch (Exception ex)
+ {
+ throw new InvalidOperationException("Kafka反序列化失败", ex);
+ }
+ }
+ }
+
+
+ public class DateTimeJsonConverter : JsonConverter
+ {
+ private readonly string _dateFormatString;
+ public DateTimeJsonConverter()
+ {
+ _dateFormatString = "yyyy-MM-dd HH:mm:ss";
+ }
+
+ public DateTimeJsonConverter(string dateFormatString)
+ {
+ _dateFormatString = dateFormatString;
+ }
+
+ public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ return DateTime.Parse(reader.GetString());
+ }
+
+ public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
+ {
+ writer.WriteStringValue(value.ToString(_dateFormatString));
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
index cfe5eee..8860061 100644
--- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
@@ -102,7 +102,7 @@ namespace JiShe.CollectBus.Kafka
{
var consumerService = provider.GetRequiredService();
- await consumerService.SubscribeAsync(attr.Topic, async (message) =>
+ await consumerService.SubscribeAsync(attr.Topic, async (message) =>
{
try
{
@@ -126,21 +126,21 @@ namespace JiShe.CollectBus.Kafka
///
///
///
- private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe)
+ private static async Task ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
{
var parameters = method.GetParameters();
bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0;
- dynamic? messageObj= null;
- if (existParameters)
- {
- var paramType = parameters[0].ParameterType;
- messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
- }
+ //dynamic? messageObj= null;
+ //if (existParameters)
+ //{
+ // var paramType = parameters[0].ParameterType;
+ // messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
+ //}
if (isGenericTask)
{
- object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { messageObj }:null)!;
+ object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
@@ -148,7 +148,7 @@ namespace JiShe.CollectBus.Kafka
}
else
{
- object? result = method.Invoke(subscribe, existParameters ? new[] { messageObj } : null);
+ object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
index 8c069a9..0ddf36b 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
@@ -17,7 +17,8 @@ namespace JiShe.CollectBus.Kafka.Producer
{
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
- private readonly ConcurrentDictionary, object> _producerCache = new();
+ private readonly ConcurrentDictionary _producerCache = new();
+ private class KafkaProducer where TKey : notnull where TValue : class { }
public ProducerService(IConfiguration configuration,ILogger logger)
{
@@ -32,14 +33,13 @@ namespace JiShe.CollectBus.Kafka.Producer
///
///
///
- private IProducer GetProducer()
+ private IProducer GetProducer(Type typeKey)
{
- var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!;
-
return (IProducer)_producerCache.GetOrAdd(typeKey, _ =>
{
var config = BuildProducerConfig();
return new ProducerBuilder(config)
+ .SetValueSerializer(new JsonSerializer()) // Value 使用自定义 JSON 序列化
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
.Build();
});
@@ -103,8 +103,17 @@ namespace JiShe.CollectBus.Kafka.Producer
///
public async Task ProduceAsync(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
{
- var producer = GetProducer();
- await producer.ProduceAsync(topic, new Message { Key = key, Value = value });
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
+ var message = new Message
+ {
+ Key = key,
+ Value = value,
+ Headers = new Headers{
+ { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
+ }
+ };
+ await producer.ProduceAsync(topic, message);
}
///
@@ -116,8 +125,16 @@ namespace JiShe.CollectBus.Kafka.Producer
///
public async Task ProduceAsync(string topic, TValue value) where TValue : class
{
- var producer = GetProducer();
- await producer.ProduceAsync(topic, new Message { Value = value });
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
+ var message = new Message
+ {
+ Value = value,
+ Headers = new Headers{
+ { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
+ }
+ };
+ await producer.ProduceAsync(topic, message);
}
///
@@ -136,9 +153,13 @@ namespace JiShe.CollectBus.Kafka.Producer
var message = new Message
{
Key = key,
- Value = value
+ Value = value,
+ Headers = new Headers{
+ { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
+ }
};
- var producer = GetProducer();
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
@@ -166,9 +187,13 @@ namespace JiShe.CollectBus.Kafka.Producer
{
var message = new Message
{
- Value = value
+ Value = value,
+ Headers = new Headers{
+ { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
+ }
};
- var producer = GetProducer();
+ var typeKey = typeof(KafkaProducer);
+ var producer = GetProducer(typeKey);
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index 963d89b..c43fa70 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -92,7 +92,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
- await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize());
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
}
@@ -133,7 +133,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
- await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize());
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
}