订阅消费暂时取消缓存消费实例

This commit is contained in:
zenghongyao 2025-04-18 08:22:52 +08:00
parent ca1e4e28e5
commit 02d189358e
8 changed files with 223 additions and 163 deletions

View File

@ -47,6 +47,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services",
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -121,6 +123,10 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -143,6 +149,7 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary> /// <summary>
/// 订阅的主题 /// 订阅的主题
/// </summary> /// </summary>
public string Topic { get; set; } public string Topic { get; set; } = null!;
/// <summary> /// <summary>
/// 分区 /// 分区
@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary> /// <summary>
/// 消费者组 /// 消费者组
/// </summary> /// </summary>
public string GroupId { get; set; } public string GroupId { get; set; } = "default";
/// <summary> /// <summary>
/// 任务数(默认是多少个分区多少个任务) /// 任务数(默认是多少个分区多少个任务)
@ -42,35 +42,27 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary> /// <summary>
/// 批次超时时间 /// 批次超时时间
/// 格式:("00:05:00")
/// </summary> /// </summary>
public TimeSpan? BatchTimeout { get; set; }=null; public TimeSpan? BatchTimeout { get; set; }=null;
/// <summary> /// <summary>
/// 订阅主题 /// 订阅主题
/// </summary> /// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param> /// <param name="batchTimeout"></param>
public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null) public KafkaSubscribeAttribute(string topic)
{ {
this.Topic = topic; this.Topic = topic;
this.GroupId = groupId;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
} }
/// <summary> /// <summary>
/// 订阅主题 /// 订阅主题
/// </summary> /// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param> public KafkaSubscribeAttribute(string topic, int partition)
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
{ {
this.Topic = topic; this.Topic = topic;
this.Partition = partition; this.Partition = partition;
this.GroupId = groupId;
this.TaskCount = 1;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
} }
} }
} }

View File

@ -39,6 +39,8 @@ namespace JiShe.CollectBus.Kafka
context.Services.AddSingleton<IProducerService, ProducerService>(); context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer // 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>(); context.Services.AddSingleton<IConsumerService, ConsumerService>();
context.Services.AddHostedService<HostedService>();
} }
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
@ -46,7 +48,7 @@ namespace JiShe.CollectBus.Kafka
var app = context.GetApplicationBuilder(); var app = context.GetApplicationBuilder();
// 注册Subscriber // 注册Subscriber
app.ApplicationServices.UseKafkaSubscribe(); //app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集 // 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));

View File

@ -51,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记 EnablePartitionEof = true, // 启用分区末尾标记
//AllowAutoCreateTopics = true, // 启用自动创建 AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
}; };
@ -106,12 +106,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
var consumerKey = typeof(KafkaConsumer<TKey, TValue>); var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => //var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
( //(
CreateConsumer<TKey, TValue>(groupId), // CreateConsumer<TKey, TValue>(groupId),
cts // cts
)).Consumer as IConsumer<TKey, TValue>; //)).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<TKey, TValue>(groupId);
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
await Task.Run(async () => await Task.Run(async () =>
@ -170,61 +170,74 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns> /// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{ {
var consumerKey = typeof(KafkaConsumer<string, TValue>); try {
var cts = new CancellationTokenSource(); var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName)) var cts = new CancellationTokenSource();
//{ //if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
// string ssss = ""; //{
//} // string ssss = "";
var consumer = _consumerStore.GetOrAdd(consumerKey, _=> //}
( //var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
CreateConsumer<string, TValue>(groupId), //(
cts // CreateConsumer<string, TValue>(groupId),
)).Consumer as IConsumer<string, TValue>; // cts
//)).Consumer as IConsumer<string, TValue>;
var consumer = CreateConsumer<Ignore, TValue>(groupId);
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
await Task.Run(async () => _ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{ {
try int count = 0;
while (!cts.IsCancellationRequested)
{ {
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); try
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
continue;
if (result.IsPartitionEOF)
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
await Task.Delay(100, cts.Token); count++;
continue; var result = consumer.Consume(cts.Token);
} if (result == null || result.Message == null || result.Message.Value == null)
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{ {
//consumer.Commit(result); // 提交偏移量 await Task.Delay(500, cts.Token);
// 跳过消息
continue; continue;
} }
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(100, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
await Task.Delay(500, cts.Token);
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
} }
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
} }
catch (ConsumeException ex) });
{ } catch (Exception ex)
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}"); {
} _logger.LogWarning($"Kafka消费异常: {ex.Message}");
}
}); }
await Task.CompletedTask; await Task.CompletedTask;
} }

View File

@ -0,0 +1,45 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class HostedService : IHostedService, IDisposable
{
private readonly ILogger _logger;
private readonly IServiceProvider _provider;
public HostedService(ILogger<HostedService> logger, IServiceProvider provider)
{
_logger = logger;
_provider = provider;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("程序启动");
Task.Run(() =>
{
_provider.UseKafkaSubscribe();
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
public void Dispose()
{
}
}
}

View File

@ -25,9 +25,9 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="app"></param> /// <param name="app"></param>
/// <param name="assembly"></param> /// <param name="assembly"></param>
public static void UseKafkaSubscribe(this IServiceProvider provider) public static async Task UseKafkaSubscribe(this IServiceProvider provider)
{ {
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); //var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息 //初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>(); var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
@ -40,10 +40,10 @@ namespace JiShe.CollectBus.Kafka
{ {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} }
List<Task> tasks = new List<Task>();
lifetime.ApplicationStarted.Register(async() => //lifetime.ApplicationStarted.Register(async() =>
{ //{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0; int threadCount = 0;
int topicCount = 0; int topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
@ -74,7 +74,7 @@ namespace JiShe.CollectBus.Kafka
{ {
if (subscribe!=null) if (subscribe!=null)
{ {
Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks);
threadCount += tuple.Item1; threadCount += tuple.Item1;
topicCount += tuple.Item2; topicCount += tuple.Item2;
} }
@ -82,58 +82,59 @@ namespace JiShe.CollectBus.Kafka
} }
} }
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); //});
await Task.WhenAll(tasks);
} }
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{ //{
var provider = app.ApplicationServices; // var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); // var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息 // //初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>(); // var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>(); // var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); // List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); // topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics) // foreach (var item in topics)
{ // {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); // kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} // }
lifetime.ApplicationStarted.Register(async () => // lifetime.ApplicationStarted.Register(async () =>
{ // {
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); // var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0; // int threadCount = 0;
int topicCount = 0; // int topicCount = 0;
var subscribeTypes = assembly.GetTypes() // var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) // .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList(); // .ToList();
if (subscribeTypes.Count == 0) return; // if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes) // foreach (var subscribeType in subscribeTypes)
{ // {
var subscribes = provider.GetServices(subscribeType).ToList(); // var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(async subscribe => { // subscribes.ForEach(async subscribe => {
if (subscribe != null) // if (subscribe != null)
{ // {
Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); // Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1; // threadCount += tuple.Item1;
topicCount += tuple.Item2; // topicCount += tuple.Item2;
} // }
}); // });
} // }
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); // logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); // });
} //}
/// <summary> /// <summary>
/// 构建Kafka订阅 /// 构建Kafka订阅
/// </summary> /// </summary>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <param name="provider"></param> /// <param name="provider"></param>
private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig) private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig, List<Task> tasks)
{ {
var subscribedMethods = subscribe.GetType().GetMethods() var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() }) .Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
@ -141,20 +142,22 @@ namespace JiShe.CollectBus.Kafka
.ToArray(); .ToArray();
//var configuration = provider.GetRequiredService<IConfiguration>(); //var configuration = provider.GetRequiredService<IConfiguration>();
int threadCount = 0; int threadCount = 0;
List<Task> tasks = new List<Task>();
foreach (var sub in subscribedMethods) foreach (var sub in subscribedMethods)
{ {
int partitionCount = 3;// kafkaOptionConfig.NumPartitions; int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
#if DEBUG
var adminClientService = provider.GetRequiredService<IAdminClientService>(); var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount= partitionCount> topicCount ? topicCount: partitionCount; partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
#endif
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0) if (partitionCount <= 0)
partitionCount = 1; partitionCount = 1;
for (int i = 0; i < partitionCount; i++) for (int i = 0; i < partitionCount; i++)
{ {
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName) //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
await StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger); tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)));
threadCount++; threadCount++;
} }
} }
@ -171,49 +174,47 @@ namespace JiShe.CollectBus.Kafka
/// <returns></returns> /// <returns></returns>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger) private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
{ {
await Task.Run(async () => var consumerService = provider.GetRequiredService<IConsumerService>();
{
var consumerService = provider.GetRequiredService<IConsumerService>();
if (attr.EnableBatch) if (attr.EnableBatch)
{
await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) =>
{ {
await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) => try
{ {
try logger.LogInformation($"kafka批量消费消息:{message}");
{ // 处理消息
logger.LogInformation($"kafka批量消费消息:{message}"); return await ProcessMessageAsync(message.ToList(), method, subscribe);
// 处理消息 }
return await ProcessMessageAsync(message.ToList(), method, subscribe); catch (ConsumeException ex)
} {
catch (ConsumeException ex) // 处理消费错误
{ logger.LogError($"kafka批量消费异常:{ex.Message}");
// 处理消费错误 }
logger.LogError($"kafka批量消费异常:{ex.Message}"); return await Task.FromResult(false);
} }, attr.GroupId, attr.BatchSize, attr.BatchTimeout);
return await Task.FromResult(false); }
}, attr.GroupId, attr.BatchSize, attr.BatchTimeout); else
} {
else await consumerService.SubscribeAsync<object>(attr.Topic, async (message) =>
{ {
await consumerService.SubscribeAsync<object>(attr.Topic, async (message) => try
{ {
try #if DEBUG
{ logger.LogInformation($"kafka消费消息:{message}");
logger.LogInformation($"kafka消费消息:{message}"); #endif
// 处理消息 // 处理消息
return await ProcessMessageAsync(new List<object>() { message }, method, subscribe); return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
// 处理消费错误 // 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}"); logger.LogError($"kafka消费异常:{ex.Message}");
} }
return await Task.FromResult(false); return await Task.FromResult(false);
}, attr.GroupId); }, attr.GroupId);
} }
});
} }

View File

@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class; Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class; Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
} }
} }

View File

@ -126,10 +126,10 @@ namespace JiShe.CollectBus.Kafka.Producer
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{ {
var typeKey = typeof(KafkaProducer<string, TValue>); var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey); var producer = GetProducer<Null, TValue>(typeKey);
var message = new Message<string, TValue> var message = new Message<Null, TValue>
{ {
Key= _kafkaOptionConfig.ServerTagName, //Key= _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
@ -184,18 +184,18 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <param name="partition"></param> /// <param name="partition"></param>
/// <param name="deliveryHandler"></param> /// <param name="deliveryHandler"></param>
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
{ {
var message = new Message<string, TValue> var message = new Message<Null, TValue>
{ {
Key = _kafkaOptionConfig.ServerTagName, //Key = _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
} }
}; };
var typeKey = typeof(KafkaProducer<string, TValue>); var typeKey = typeof(KafkaProducer<Null, TValue>);
var producer = GetProducer<string, TValue>(typeKey); var producer = GetProducer<Null, TValue>(typeKey);
if (partition.HasValue) if (partition.HasValue)
{ {
var topicPartition = new TopicPartition(topic, partition.Value); var topicPartition = new TopicPartition(topic, partition.Value);