Compare commits
2 Commits
902993e481
...
0584515df9
| Author | SHA1 | Date | |
|---|---|---|---|
| 0584515df9 | |||
| 72f8222ec2 |
@ -139,20 +139,41 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
{
|
{
|
||||||
var consumerService = provider.GetRequiredService<IConsumerService>();
|
var consumerService = provider.GetRequiredService<IConsumerService>();
|
||||||
|
|
||||||
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
|
if (attr.EnableBatch)
|
||||||
{
|
{
|
||||||
try
|
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
|
||||||
{
|
{
|
||||||
// 处理消息
|
try
|
||||||
return await ProcessMessageAsync(message, method, subscribe);
|
{
|
||||||
}
|
// 处理消息
|
||||||
catch (ConsumeException ex)
|
return await ProcessMessageAsync(message, method, subscribe);
|
||||||
|
}
|
||||||
|
catch (ConsumeException ex)
|
||||||
|
{
|
||||||
|
// 处理消费错误
|
||||||
|
logger.LogError($"kafka批量消费异常:{ex.Message}");
|
||||||
|
}
|
||||||
|
return await Task.FromResult(false);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
|
||||||
{
|
{
|
||||||
// 处理消费错误
|
try
|
||||||
logger.LogError($"kafka消费异常:{ex.Message}");
|
{
|
||||||
}
|
// 处理消息
|
||||||
return await Task.FromResult(false);
|
return await ProcessMessageAsync(message, method, subscribe);
|
||||||
});
|
}
|
||||||
|
catch (ConsumeException ex)
|
||||||
|
{
|
||||||
|
// 处理消费错误
|
||||||
|
logger.LogError($"kafka消费异常:{ex.Message}");
|
||||||
|
}
|
||||||
|
return await Task.FromResult(false);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user