From 95712c4c0e527a3afb1b8f67673176ed825ef654 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 22 May 2025 09:33:46 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3docker=20=E9=87=8D=E5=90=AF?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E6=B6=88=E8=B4=B9=E8=80=85=E8=A2=AB=E9=87=8A?= =?UTF-8?q?=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AdminClient/AdminClientService.cs | 4 +- .../Consumer/ConsumerService.cs | 95 ++++++++++++++----- 2 files changed, 72 insertions(+), 27 deletions(-) diff --git a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs index ac793b2..918aa57 100644 --- a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs +++ b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs @@ -79,7 +79,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe public async Task> ListTopicsAsync() { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); - return new List(metadata.Topics.Select(t => t.Topic)); + return await Task.FromResult(new List(metadata.Topics.Select(t => t.Topic))); } /// @@ -90,7 +90,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe public async Task TopicExistsAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); - return metadata.Topics.Any(t => t.Topic == topic); + return await Task.FromResult(metadata.Topics.Any(t => t.Topic == topic)); } /// diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 043af46..31b2d9c 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -12,6 +12,7 @@ using System.Collections.Concurrent; using System.Text; using System.Text.RegularExpressions; using System.Threading; +using YamlDotNet.Core.Tokens; namespace JiShe.CollectBus.Kafka.Consumer { @@ -132,18 +133,21 @@ namespace JiShe.CollectBus.Kafka.Consumer { await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), - cts - )).Consumer as IConsumer; + new CancellationTokenSource() + )); + if (consumerStore.Consumer == null) + { + _logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放"); + return; + } + var consumer = consumerStore.Consumer as IConsumer; + var cts = consumerStore.CTS; consumer!.Subscribe(topics); - _ = Task.Run(async () => { while (!cts.IsCancellationRequested) @@ -195,6 +199,11 @@ namespace JiShe.CollectBus.Kafka.Consumer { //ignore } + catch (ObjectDisposedException) + { + _logger.LogError($"{string.Join("、", topics)}消费者被释放"); + break; + } catch (Exception ex) { _logger.LogError(ex, "处理消息时发生未知错误"); @@ -229,12 +238,19 @@ namespace JiShe.CollectBus.Kafka.Consumer await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + + var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), - cts - )).Consumer as IConsumer; + new CancellationTokenSource() + )); + if (consumerStore.Consumer == null) + { + _logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放"); + return; + } + var consumer = consumerStore.Consumer as IConsumer; + var cts = consumerStore.CTS; consumer!.Subscribe(topics); @@ -288,6 +304,11 @@ namespace JiShe.CollectBus.Kafka.Consumer { //ignore } + catch (ObjectDisposedException) + { + _logger.LogError($"{string.Join("、", topics)}消费者被释放"); + break; + } catch (Exception ex) { _logger.LogError(ex, "处理消息时发生未知错误"); @@ -346,13 +367,20 @@ namespace JiShe.CollectBus.Kafka.Consumer { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); + + var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + new CancellationTokenSource() + )); + if (consumerStore.Consumer == null) + { + _logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放"); + return; + } + var consumer = consumerStore.Consumer as IConsumer; + var cts = consumerStore.CTS; - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 @@ -444,6 +472,11 @@ namespace JiShe.CollectBus.Kafka.Consumer { //ignore } + catch (ObjectDisposedException) + { + _logger.LogError($"{string.Join("、", topics)}消费者被释放"); + break; + } catch (Exception ex) { _logger.LogError(ex, "处理批量消息时发生未知错误"); @@ -505,13 +538,18 @@ namespace JiShe.CollectBus.Kafka.Consumer { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; + var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + new CancellationTokenSource() + )); + if (consumerStore.Consumer == null) + { + _logger.LogWarning($"{string.Join("、", topics)}创建消息消费失败或消费组已被释放"); + return; + } + var consumer = consumerStore.Consumer as IConsumer; + var cts = consumerStore.CTS; consumer!.Subscribe(topics); @@ -602,6 +640,11 @@ namespace JiShe.CollectBus.Kafka.Consumer { //ignore } + catch (ObjectDisposedException) + { + _logger.LogError($"{string.Join("、", topics)}消费者被释放"); + break; + } catch (Exception ex) { _logger.LogError(ex, "处理批量消息时发生未知错误"); @@ -630,11 +673,13 @@ namespace JiShe.CollectBus.Kafka.Consumer try { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - if (_consumerStore.TryRemove(consumerKey, out var entry)) + if (_consumerStore.TryGetValue(consumerKey, out var entry)) { entry.CTS.Cancel(); (entry.Consumer as IDisposable)?.Dispose(); entry.CTS.Dispose(); + // 从字典中移除 + _consumerStore.TryRemove(consumerKey, out entry); } } catch (Exception ex)