diff --git a/.gitignore b/.gitignore
index daafdc8..68f674a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -401,3 +401,4 @@ FodyWeavers.xsd
# ABP Studio
**/.abpstudio/
/src/JiShe.CollectBus.Host/Plugins/*.dll
+JiShe.CollectBus.Kafka.Test
diff --git a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs
new file mode 100644
index 0000000..5f70b77
--- /dev/null
+++ b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs
@@ -0,0 +1,51 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Kafka.Attributes
+{
+ [AttributeUsage(AttributeTargets.Method)]
+ public class KafkaSubscribeAttribute : Attribute
+ {
+ ///
+ /// 订阅的主题
+ ///
+ public string[] Topics { get; set; }
+
+ ///
+ /// 分区
+ ///
+ public int Partition { get; set; } = -1;
+
+ ///
+ /// 消费者组
+ ///
+ public string GroupId { get; set; } = "default";
+
+ public KafkaSubscribeAttribute(string[] topics)
+ {
+ this.Topics = topics;
+ }
+
+ public KafkaSubscribeAttribute(string[] topics, string groupId)
+ {
+ this.Topics = topics;
+ this.GroupId = groupId;
+ }
+
+ public KafkaSubscribeAttribute(string[] topics, int partition)
+ {
+ this.Topics = topics;
+ this.Partition = partition;
+ }
+
+ public KafkaSubscribeAttribute(string[] topics, int partition, string groupId)
+ {
+ this.Topics = topics;
+ this.Partition = partition;
+ this.GroupId = groupId;
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs
index a73eb79..f0462b0 100644
--- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs
@@ -1,9 +1,14 @@
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Producer;
+using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using System.Reflection;
+using Volo.Abp;
+using Volo.Abp.DependencyInjection;
using Volo.Abp.Modularity;
+using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka
{
@@ -16,5 +21,12 @@ namespace JiShe.CollectBus.Kafka
// 注册Consumer
context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>));
}
+
+ public override void OnApplicationInitialization(ApplicationInitializationContext context)
+ {
+ var app = context.GetApplicationBuilder();
+ app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
+
+ }
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
index 34d63fb..a7626dd 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
@@ -69,6 +69,34 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
}
+ ///
+ /// 订阅多个topic
+ ///
+ ///
+ ///
+ ///
+ public async Task SubscribeAsync(string[] topics, Func messageHandler)
+ {
+ _cancellationTokenSource = new CancellationTokenSource();
+ Instance.Subscribe(topics);
+
+ try
+ {
+ while (!_cancellationTokenSource.Token.IsCancellationRequested)
+ {
+ var result = Instance.Consume(_cancellationTokenSource.Token);
+ if (result != null)
+ {
+ await messageHandler(result.Message.Key, result.Message.Value);
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ Instance.Close();
+ }
+ }
+
public void Unsubscribe()
{
_cancellationTokenSource?.Cancel();
diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
index bb88038..990bc86 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
@@ -11,5 +11,13 @@ namespace JiShe.CollectBus.Kafka.Consumer
Task SubscribeAsync(string topic, Func messageHandler);
void Unsubscribe();
void Dispose();
+
+ ///
+ /// 订阅多个topic
+ ///
+ ///
+ ///
+ ///
+ Task SubscribeAsync(string[] topics, Func messageHandler);
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs b/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs
new file mode 100644
index 0000000..3ccea65
--- /dev/null
+++ b/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Kafka
+{
+ public interface IKafkaSubscribe
+ {
+ }
+}
diff --git a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj
index 3175346..cef24d5 100644
--- a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj
+++ b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj
@@ -8,7 +8,8 @@
-
+
+
diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
new file mode 100644
index 0000000..fede540
--- /dev/null
+++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
@@ -0,0 +1,119 @@
+using Confluent.Kafka;
+using JiShe.CollectBus.Kafka.Attributes;
+using JiShe.CollectBus.Kafka.Consumer;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Kafka
+{
+ public static class KafkaSubcribesExtensions
+ {
+ ///
+ /// 添加Kafka订阅
+ ///
+ ///
+ ///
+ public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
+ {
+ var subscribeTypes = assembly.GetTypes()
+ .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
+ .ToList();
+
+ if (subscribeTypes.Count == 0) return;
+
+ var provider = app.ApplicationServices;
+ var lifetime = provider.GetRequiredService();
+
+ lifetime.ApplicationStarted.Register(() =>
+ {
+ foreach (var subscribeType in subscribeTypes)
+ {
+ var subscribes = provider.GetServices(subscribeType).ToList();
+ subscribes.ForEach(subscribe => {
+
+ if(subscribe is IKafkaSubscribe)
+ {
+ BuildKafkaSubscriber(subscribe, provider);
+ }
+ });
+ }
+ });
+ }
+
+ ///
+ /// 构建Kafka订阅
+ ///
+ ///
+ ///
+ private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider)
+ {
+ var methods = subscribe.GetType().GetMethods();
+ foreach (var method in methods)
+ {
+ var attr = method.GetCustomAttribute();
+ if (attr == null) continue;
+
+ // 启动后台消费线程
+ Task.Run(() => StartConsumerAsync(provider, attr, method, subscribe));
+ }
+ }
+
+ ///
+ /// 启动后台消费线程
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe)
+ {
+ var consumerService = provider.GetRequiredService>();
+ await consumerService.SubscribeAsync(attr.Topics, async (key, message) =>
+ {
+ try
+ {
+ if (string.IsNullOrEmpty(message))
+ await Task.CompletedTask ;
+
+ // 处理消息
+ await ProcessMessageAsync(message, method, subscribe);
+ }
+ catch (ConsumeException ex)
+ {
+ // 处理消费错误
+ throw;
+ }
+ });
+ }
+
+ private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe)
+ {
+ var parameters = method.GetParameters();
+ if (parameters.Length != 1) return;
+
+ var paramType = parameters[0].ParameterType;
+ var messageObj = paramType == typeof(string)
+ ? message
+ : JsonConvert.DeserializeObject(message, paramType);
+
+ if (method.ReturnType == typeof(Task))
+ {
+ await (Task)method.Invoke(subscribe, new[] { messageObj })!;
+ }
+ else
+ {
+ method.Invoke(subscribe, new[] { messageObj });
+ }
+ }
+
+ }
+}
diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
index c3e65f6..99d3dd3 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
@@ -37,7 +37,7 @@ namespace JiShe.CollectBus.Kafka.Producer
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd
BatchSize = 32768, // 修改批次大小为32K
LingerMs = 20, // 修改等待时间为20ms
- Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功
+ Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
};