Compare commits

...

2 Commits

2 changed files with 72 additions and 27 deletions

View File

@ -79,7 +79,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
public async Task<List<string>> ListTopicsAsync() public async Task<List<string>> ListTopicsAsync()
{ {
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return new List<string>(metadata.Topics.Select(t => t.Topic)); return await Task.FromResult(new List<string>(metadata.Topics.Select(t => t.Topic)));
} }
/// <summary> /// <summary>
@ -90,7 +90,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
public async Task<bool> TopicExistsAsync(string topic) public async Task<bool> TopicExistsAsync(string topic)
{ {
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Any(t => t.Topic == topic); return await Task.FromResult(metadata.Topics.Any(t => t.Topic == topic));
} }
/// <summary> /// <summary>

View File

@ -12,6 +12,7 @@ using System.Collections.Concurrent;
using System.Text; using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading; using System.Threading;
using YamlDotNet.Core.Tokens;
namespace JiShe.CollectBus.Kafka.Consumer namespace JiShe.CollectBus.Kafka.Consumer
{ {
@ -132,18 +133,21 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{ {
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource(); var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
( (
CreateConsumer<TKey, TValue>(groupId), CreateConsumer<TKey, TValue>(groupId),
cts new CancellationTokenSource()
)).Consumer as IConsumer<TKey, TValue>; ));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
_ = Task.Run(async () => _ = Task.Run(async () =>
{ {
while (!cts.IsCancellationRequested) while (!cts.IsCancellationRequested)
@ -195,6 +199,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
//ignore //ignore
} }
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理消息时发生未知错误"); _logger.LogError(ex, "处理消息时发生未知错误");
@ -229,12 +238,19 @@ namespace JiShe.CollectBus.Kafka.Consumer
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{ {
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
( (
CreateConsumer<Ignore, TValue>(groupId), CreateConsumer<Ignore, TValue>(groupId),
cts new CancellationTokenSource()
)).Consumer as IConsumer<Ignore, TValue>; ));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
@ -288,6 +304,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
//ignore //ignore
} }
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理消息时发生未知错误"); _logger.LogError(ex, "处理消息时发生未知错误");
@ -346,13 +367,20 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
@ -444,6 +472,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
//ignore //ignore
} }
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理批量消息时发生未知错误"); _logger.LogError(ex, "处理批量消息时发生未知错误");
@ -505,13 +538,18 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource(); var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => CreateConsumer<Ignore, TValue>(groupId),
( new CancellationTokenSource()
CreateConsumer<Ignore, TValue>(groupId), ));
cts if (consumerStore.Consumer == null)
)).Consumer as IConsumer<Ignore, TValue>; {
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
@ -602,6 +640,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
//ignore //ignore
} }
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理批量消息时发生未知错误"); _logger.LogError(ex, "处理批量消息时发生未知错误");
@ -630,11 +673,13 @@ namespace JiShe.CollectBus.Kafka.Consumer
try try
{ {
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
if (_consumerStore.TryRemove(consumerKey, out var entry)) if (_consumerStore.TryGetValue(consumerKey, out var entry))
{ {
entry.CTS.Cancel(); entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose(); (entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose(); entry.CTS.Dispose();
// 从字典中移除
_consumerStore.TryRemove(consumerKey, out entry);
} }
} }
catch (Exception ex) catch (Exception ex)