diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index d4e6591..fa3fd6c 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -47,6 +47,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services",
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -121,6 +123,10 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -143,6 +149,7 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
+ {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs
index df75b89..c74aa2e 100644
--- a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs
+++ b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs
@@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
///
/// 订阅的主题
///
- public string Topic { get; set; }
+ public string Topic { get; set; } = null!;
///
/// 分区
@@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
///
/// 消费者组
///
- public string GroupId { get; set; }
+ public string GroupId { get; set; } = "default";
///
/// 任务数(默认是多少个分区多少个任务)
@@ -42,35 +42,27 @@ namespace JiShe.CollectBus.Kafka.Attributes
///
/// 批次超时时间
+ /// 格式:("00:05:00")
///
public TimeSpan? BatchTimeout { get; set; }=null;
+
///
/// 订阅主题
///
- /// batchTimeout格式:("00:05:00")
- public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
+ ///
+ public KafkaSubscribeAttribute(string topic)
{
this.Topic = topic;
- this.GroupId = groupId;
- this.EnableBatch = enableBatch;
- this.BatchSize = batchSize;
- this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
}
///
/// 订阅主题
///
- /// batchTimeout格式:("00:05:00")
- public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
+ public KafkaSubscribeAttribute(string topic, int partition)
{
this.Topic = topic;
this.Partition = partition;
- this.GroupId = groupId;
- this.TaskCount = 1;
- this.EnableBatch = enableBatch;
- this.BatchSize = batchSize;
- this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
}
}
}
diff --git a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs
index 39b6444..867e2b7 100644
--- a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs
+++ b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs
@@ -39,6 +39,8 @@ namespace JiShe.CollectBus.Kafka
context.Services.AddSingleton();
// 注册Consumer
context.Services.AddSingleton();
+
+ context.Services.AddHostedService();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
@@ -46,7 +48,7 @@ namespace JiShe.CollectBus.Kafka
var app = context.GetApplicationBuilder();
// 注册Subscriber
- app.ApplicationServices.UseKafkaSubscribe();
+ //app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
index 8eacb5b..0ec5bd0 100644
--- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
@@ -51,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
- //AllowAutoCreateTopics = true, // 启用自动创建
+ AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB)
};
@@ -106,12 +106,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
var consumerKey = typeof(KafkaConsumer);
var cts = new CancellationTokenSource();
- var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
- (
- CreateConsumer(groupId),
- cts
- )).Consumer as IConsumer;
-
+ //var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ //(
+ // CreateConsumer(groupId),
+ // cts
+ //)).Consumer as IConsumer;
+ var consumer = CreateConsumer(groupId);
consumer!.Subscribe(topics);
await Task.Run(async () =>
@@ -170,61 +170,74 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class
{
- var consumerKey = typeof(KafkaConsumer);
- var cts = new CancellationTokenSource();
- //if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
- //{
- // string ssss = "";
- //}
- var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
- (
- CreateConsumer(groupId),
- cts
- )).Consumer as IConsumer;
+ try {
+ var consumerKey = typeof(KafkaConsumer);
+ var cts = new CancellationTokenSource();
+ //if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
+ //{
+ // string ssss = "";
+ //}
+ //var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
+ //(
+ // CreateConsumer(groupId),
+ // cts
+ //)).Consumer as IConsumer;
-
- consumer!.Subscribe(topics);
+ var consumer = CreateConsumer(groupId);
+ consumer!.Subscribe(topics);
- await Task.Run(async () =>
- {
- while (!cts.IsCancellationRequested)
+ _ = Task.Run(async () =>
{
- try
+ int count = 0;
+ while (!cts.IsCancellationRequested)
{
- //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
- var result = consumer.Consume(cts.Token);
- if (result == null || result.Message==null || result.Message.Value == null)
- continue;
-
- if (result.IsPartitionEOF)
+ try
{
- _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
- await Task.Delay(100, cts.Token);
- continue;
- }
- if (_kafkaOptionConfig.EnableFilter)
- {
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
- // 检查 Header 是否符合条件
- if (!headersFilter.Match(result.Message.Headers))
+ //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
+ count++;
+ var result = consumer.Consume(cts.Token);
+ if (result == null || result.Message == null || result.Message.Value == null)
{
- //consumer.Commit(result); // 提交偏移量
- // 跳过消息
+ await Task.Delay(500, cts.Token);
continue;
}
+
+ if (result.IsPartitionEOF)
+ {
+ _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
+ await Task.Delay(100, cts.Token);
+ continue;
+ }
+ if (_kafkaOptionConfig.EnableFilter)
+ {
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
+ // 检查 Header 是否符合条件
+ if (!headersFilter.Match(result.Message.Headers))
+ {
+ await Task.Delay(500, cts.Token);
+ //consumer.Commit(result); // 提交偏移量
+ // 跳过消息
+ continue;
+ }
+ }
+ bool sucess = await messageHandler(result.Message.Value);
+ if (sucess)
+ consumer.Commit(result); // 手动提交
+ else
+ consumer.StoreOffset(result);
+ }
+ catch (ConsumeException ex)
+ {
+ _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
}
- bool sucess = await messageHandler(result.Message.Value);
- if (sucess)
- consumer.Commit(result); // 手动提交
- else
- consumer.StoreOffset(result);
}
- catch (ConsumeException ex)
- {
- _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
- }
- }
- });
+ });
+ } catch (Exception ex)
+ {
+ _logger.LogWarning($"Kafka消费异常: {ex.Message}");
+
+ }
+
await Task.CompletedTask;
}
diff --git a/modules/JiShe.CollectBus.Kafka/HostedService.cs b/modules/JiShe.CollectBus.Kafka/HostedService.cs
new file mode 100644
index 0000000..dcd5197
--- /dev/null
+++ b/modules/JiShe.CollectBus.Kafka/HostedService.cs
@@ -0,0 +1,45 @@
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Kafka
+{
+ public class HostedService : IHostedService, IDisposable
+ {
+ private readonly ILogger _logger;
+ private readonly IServiceProvider _provider;
+ public HostedService(ILogger logger, IServiceProvider provider)
+ {
+ _logger = logger;
+ _provider = provider;
+ }
+
+ public Task StartAsync(CancellationToken cancellationToken)
+ {
+ _logger.LogInformation("程序启动");
+ Task.Run(() =>
+ {
+ _provider.UseKafkaSubscribe();
+ });
+ return Task.CompletedTask;
+ }
+
+ public Task StopAsync(CancellationToken cancellationToken)
+ {
+ _logger.LogInformation("结束");
+
+
+ return Task.CompletedTask;
+ }
+
+ public void Dispose()
+ {
+
+ }
+ }
+}
diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs
index f94d296..fa26e53 100644
--- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs
+++ b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs
@@ -25,9 +25,9 @@ namespace JiShe.CollectBus.Kafka
///
///
///
- public static void UseKafkaSubscribe(this IServiceProvider provider)
+ public static async Task UseKafkaSubscribe(this IServiceProvider provider)
{
- var lifetime = provider.GetRequiredService();
+ //var lifetime = provider.GetRequiredService();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService();
@@ -40,10 +40,10 @@ namespace JiShe.CollectBus.Kafka
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
-
- lifetime.ApplicationStarted.Register(async() =>
- {
- var logger = provider.GetRequiredService>();
+ List tasks = new List();
+ //lifetime.ApplicationStarted.Register(async() =>
+ //{
+ var logger = provider.GetRequiredService>();
int threadCount = 0;
int topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
@@ -74,7 +74,7 @@ namespace JiShe.CollectBus.Kafka
{
if (subscribe!=null)
{
- Tuple tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
+ Tuple tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
@@ -82,58 +82,59 @@ namespace JiShe.CollectBus.Kafka
}
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
- });
+ //});
+ await Task.WhenAll(tasks);
}
- public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
- {
- var provider = app.ApplicationServices;
- var lifetime = provider.GetRequiredService();
- //初始化主题信息
- var kafkaAdminClient = provider.GetRequiredService();
- var kafkaOptions = provider.GetRequiredService>();
+ //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
+ //{
+ // var provider = app.ApplicationServices;
+ // var lifetime = provider.GetRequiredService();
+ // //初始化主题信息
+ // var kafkaAdminClient = provider.GetRequiredService();
+ // var kafkaOptions = provider.GetRequiredService>();
- List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
- topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
+ // List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
+ // topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
- foreach (var item in topics)
- {
- kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
- }
+ // foreach (var item in topics)
+ // {
+ // kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
+ // }
- lifetime.ApplicationStarted.Register(async () =>
- {
- var logger = provider.GetRequiredService>();
- int threadCount = 0;
- int topicCount = 0;
- var subscribeTypes = assembly.GetTypes()
- .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
- .ToList();
+ // lifetime.ApplicationStarted.Register(async () =>
+ // {
+ // var logger = provider.GetRequiredService>();
+ // int threadCount = 0;
+ // int topicCount = 0;
+ // var subscribeTypes = assembly.GetTypes()
+ // .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
+ // .ToList();
- if (subscribeTypes.Count == 0) return;
- foreach (var subscribeType in subscribeTypes)
- {
- var subscribes = provider.GetServices(subscribeType).ToList();
- subscribes.ForEach(async subscribe => {
+ // if (subscribeTypes.Count == 0) return;
+ // foreach (var subscribeType in subscribeTypes)
+ // {
+ // var subscribes = provider.GetServices(subscribeType).ToList();
+ // subscribes.ForEach(async subscribe => {
- if (subscribe != null)
- {
- Tuple tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
- threadCount += tuple.Item1;
- topicCount += tuple.Item2;
- }
- });
- }
- logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
- });
- }
+ // if (subscribe != null)
+ // {
+ // Tuple tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
+ // threadCount += tuple.Item1;
+ // topicCount += tuple.Item2;
+ // }
+ // });
+ // }
+ // logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
+ // });
+ //}
///
/// 构建Kafka订阅
///
///
///
- private static async Task> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig)
+ private static async Task> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig, List tasks)
{
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute() })
@@ -141,20 +142,22 @@ namespace JiShe.CollectBus.Kafka
.ToArray();
//var configuration = provider.GetRequiredService();
int threadCount = 0;
- List tasks = new List();
+
foreach (var sub in subscribedMethods)
{
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
+#if DEBUG
var adminClientService = provider.GetRequiredService();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
+#endif
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
for (int i = 0; i < partitionCount; i++)
{
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
- await StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger);
+ tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)));
threadCount++;
}
}
@@ -171,49 +174,47 @@ namespace JiShe.CollectBus.Kafka
///
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger)
{
- await Task.Run(async () =>
- {
- var consumerService = provider.GetRequiredService();
+ var consumerService = provider.GetRequiredService();
- if (attr.EnableBatch)
+ if (attr.EnableBatch)
+ {
+ await consumerService.SubscribeBatchAsync