解决docker 重启导致消费者被释放
This commit is contained in:
parent
f746d84225
commit
95712c4c0e
@ -79,7 +79,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
|
|||||||
public async Task<List<string>> ListTopicsAsync()
|
public async Task<List<string>> ListTopicsAsync()
|
||||||
{
|
{
|
||||||
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
||||||
return new List<string>(metadata.Topics.Select(t => t.Topic));
|
return await Task.FromResult(new List<string>(metadata.Topics.Select(t => t.Topic)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -90,7 +90,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
|
|||||||
public async Task<bool> TopicExistsAsync(string topic)
|
public async Task<bool> TopicExistsAsync(string topic)
|
||||||
{
|
{
|
||||||
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
||||||
return metadata.Topics.Any(t => t.Topic == topic);
|
return await Task.FromResult(metadata.Topics.Any(t => t.Topic == topic));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@ -12,6 +12,7 @@ using System.Collections.Concurrent;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Text.RegularExpressions;
|
using System.Text.RegularExpressions;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
|
using YamlDotNet.Core.Tokens;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
namespace JiShe.CollectBus.Kafka.Consumer
|
||||||
{
|
{
|
||||||
@ -132,18 +133,21 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
||||||
{
|
{
|
||||||
|
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
(
|
||||||
CreateConsumer<TKey, TValue>(groupId),
|
CreateConsumer<TKey, TValue>(groupId),
|
||||||
cts
|
new CancellationTokenSource()
|
||||||
)).Consumer as IConsumer<TKey, TValue>;
|
));
|
||||||
|
if (consumerStore.Consumer == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
|
||||||
|
var cts = consumerStore.CTS;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
_ = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
while (!cts.IsCancellationRequested)
|
while (!cts.IsCancellationRequested)
|
||||||
@ -195,6 +199,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
//ignore
|
//ignore
|
||||||
}
|
}
|
||||||
|
catch (ObjectDisposedException)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{string.Join("、", topics)}消费者被释放");
|
||||||
|
break;
|
||||||
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "处理消息时发生未知错误");
|
_logger.LogError(ex, "处理消息时发生未知错误");
|
||||||
@ -229,12 +238,19 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
||||||
{
|
{
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
(
|
(
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
CreateConsumer<Ignore, TValue>(groupId),
|
||||||
cts
|
new CancellationTokenSource()
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
));
|
||||||
|
if (consumerStore.Consumer == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
|
||||||
|
var cts = consumerStore.CTS;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
@ -288,6 +304,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
//ignore
|
//ignore
|
||||||
}
|
}
|
||||||
|
catch (ObjectDisposedException)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{string.Join("、", topics)}消费者被释放");
|
||||||
|
break;
|
||||||
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "处理消息时发生未知错误");
|
_logger.LogError(ex, "处理消息时发生未知错误");
|
||||||
@ -346,13 +367,20 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
|
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
(
|
(
|
||||||
CreateConsumer<TKey, TValue>(groupId),
|
CreateConsumer<TKey, TValue>(groupId),
|
||||||
cts
|
new CancellationTokenSource()
|
||||||
)).Consumer as IConsumer<TKey, TValue>;
|
));
|
||||||
|
if (consumerStore.Consumer == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
|
||||||
|
var cts = consumerStore.CTS;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
||||||
@ -444,6 +472,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
//ignore
|
//ignore
|
||||||
}
|
}
|
||||||
|
catch (ObjectDisposedException)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{string.Join("、", topics)}消费者被释放");
|
||||||
|
break;
|
||||||
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
||||||
@ -505,13 +538,18 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
|
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
(
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
CreateConsumer<Ignore, TValue>(groupId),
|
||||||
cts
|
new CancellationTokenSource()
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
));
|
||||||
|
if (consumerStore.Consumer == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
|
||||||
|
var cts = consumerStore.CTS;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
@ -602,6 +640,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
//ignore
|
//ignore
|
||||||
}
|
}
|
||||||
|
catch (ObjectDisposedException)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{string.Join("、", topics)}消费者被释放");
|
||||||
|
break;
|
||||||
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
||||||
@ -630,11 +673,13 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
if (_consumerStore.TryRemove(consumerKey, out var entry))
|
if (_consumerStore.TryGetValue(consumerKey, out var entry))
|
||||||
{
|
{
|
||||||
entry.CTS.Cancel();
|
entry.CTS.Cancel();
|
||||||
(entry.Consumer as IDisposable)?.Dispose();
|
(entry.Consumer as IDisposable)?.Dispose();
|
||||||
entry.CTS.Dispose();
|
entry.CTS.Dispose();
|
||||||
|
// 从字典中移除
|
||||||
|
_consumerStore.TryRemove(consumerKey, out entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user