Compare commits
No commits in common. "f74bedd3cdfeb49dc389c3642e8b6519e3558834" and "1b1c4e568318f082f5e63bb4a1924d911f3c94ed" have entirely different histories.
f74bedd3cd
...
1b1c4e5683
1
.gitignore
vendored
1
.gitignore
vendored
@ -401,4 +401,3 @@ FodyWeavers.xsd
|
|||||||
# ABP Studio
|
# ABP Studio
|
||||||
**/.abpstudio/
|
**/.abpstudio/
|
||||||
/src/JiShe.CollectBus.Host/Plugins/*.dll
|
/src/JiShe.CollectBus.Host/Plugins/*.dll
|
||||||
JiShe.CollectBus.Kafka.Test
|
|
||||||
|
|||||||
@ -37,8 +37,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvi
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{82E4562A-3A7F-4372-8D42-8AE41BA56C04}"
|
|
||||||
EndProject
|
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@ -109,10 +107,6 @@ Global
|
|||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
|
||||||
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
|
||||||
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
|
||||||
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.Build.0 = Release|Any CPU
|
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
@ -134,7 +128,6 @@ Global
|
|||||||
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{82E4562A-3A7F-4372-8D42-8AE41BA56C04} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
||||||
|
|||||||
@ -17,13 +17,10 @@ using JiShe.CollectBus.Common.Helpers;
|
|||||||
using JiShe.CollectBus.IotSystems.AFNEntity;
|
using JiShe.CollectBus.IotSystems.AFNEntity;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
|
||||||
using System.Text.Json;
|
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Samples;
|
namespace JiShe.CollectBus.Samples;
|
||||||
|
|
||||||
public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe
|
public class SampleAppService : CollectBusAppService, ISampleAppService
|
||||||
{
|
{
|
||||||
private readonly ILogger<SampleAppService> _logger;
|
private readonly ILogger<SampleAppService> _logger;
|
||||||
private readonly IIoTDBProvider _iotDBProvider;
|
private readonly IIoTDBProvider _iotDBProvider;
|
||||||
@ -183,12 +180,4 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
var aa = LazyServiceProvider.GetKeyedService<IProtocolPlugin>("TestProtocolPlugin");
|
var aa = LazyServiceProvider.GetKeyedService<IProtocolPlugin>("TestProtocolPlugin");
|
||||||
return aa == null;
|
return aa == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(["test-topic"])]
|
|
||||||
|
|
||||||
public async Task<bool> KafkaSubscribeAsync(string obj)
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"收到订阅消息: {obj}");
|
|
||||||
return await Task.FromResult(true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -199,7 +199,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||||
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||||
#else
|
#else
|
||||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||||
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||||
|
|||||||
@ -163,7 +163,6 @@
|
|||||||
overflow-y: hidden;
|
overflow-y: hidden;
|
||||||
color: #555;
|
color: #555;
|
||||||
} */
|
} */
|
||||||
|
|
||||||
.caption {
|
.caption {
|
||||||
padding: 9px;
|
padding: 9px;
|
||||||
overflow-y: hidden;
|
overflow-y: hidden;
|
||||||
|
|||||||
@ -42,8 +42,7 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public IAdminClient GetInstance(IConfiguration configuration)
|
public IAdminClient GetInstance(IConfiguration configuration)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
||||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
|
|
||||||
var adminClientConfig = new AdminClientConfig()
|
var adminClientConfig = new AdminClientConfig()
|
||||||
{
|
{
|
||||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
@ -114,37 +113,6 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
return metadata.Topics.Any(t => t.Topic == topic);
|
return metadata.Topics.Any(t => t.Topic == topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 检测分区是否存在
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="partitions"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public Dictionary<int, bool> CheckPartitionsExists(string topic, int[] partitions)
|
|
||||||
{
|
|
||||||
var result = new Dictionary<int, bool>();
|
|
||||||
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
|
|
||||||
if (metadata.Topics.Count == 0)
|
|
||||||
return partitions.ToDictionary(p => p, p => false);
|
|
||||||
var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet();
|
|
||||||
return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 检测分区是否存在
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="targetPartition"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public bool CheckPartitionsExist(string topic, int targetPartition)
|
|
||||||
{
|
|
||||||
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
|
|
||||||
if (metadata.Topics.Count == 0)
|
|
||||||
return false;
|
|
||||||
var partitions = metadata.Topics[0].Partitions;
|
|
||||||
return partitions.Any(p => p.PartitionId == targetPartition);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
Instance?.Dispose();
|
Instance?.Dispose();
|
||||||
|
|||||||
@ -12,21 +12,5 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
Task DeleteTopicAsync(string topic);
|
Task DeleteTopicAsync(string topic);
|
||||||
Task<List<string>> ListTopicsAsync();
|
Task<List<string>> ListTopicsAsync();
|
||||||
Task<bool> TopicExistsAsync(string topic);
|
Task<bool> TopicExistsAsync(string topic);
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 检测分区是否存在
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="partitions"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Dictionary<int, bool> CheckPartitionsExists(string topic, int[] partitions);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 检测分区是否存在
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="targetPartition"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
bool CheckPartitionsExist(string topic, int targetPartition);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,53 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Attributes
|
|
||||||
{
|
|
||||||
[AttributeUsage(AttributeTargets.Method)]
|
|
||||||
public class KafkaSubscribeAttribute : Attribute
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 订阅的主题
|
|
||||||
/// </summary>
|
|
||||||
public string[] Topics { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 分区
|
|
||||||
/// </summary>
|
|
||||||
public int Partition { get; set; } = -1;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消费者组
|
|
||||||
/// </summary>
|
|
||||||
public string GroupId { get; set; }
|
|
||||||
|
|
||||||
public KafkaSubscribeAttribute(string[] topics, string groupId = "default")
|
|
||||||
{
|
|
||||||
this.Topics = topics;
|
|
||||||
this.GroupId = groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public KafkaSubscribeAttribute(string topic, string groupId = "default")
|
|
||||||
{
|
|
||||||
this.Topics = new string[] { topic };
|
|
||||||
this.GroupId = groupId;
|
|
||||||
}
|
|
||||||
public KafkaSubscribeAttribute(string[] topics, int partition, string groupId = "default")
|
|
||||||
{
|
|
||||||
this.Topics = topics;
|
|
||||||
this.Partition = partition;
|
|
||||||
this.GroupId = groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default")
|
|
||||||
{
|
|
||||||
this.Topics = new string[] { topic };
|
|
||||||
this.Partition = partition;
|
|
||||||
this.GroupId = groupId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,14 +1,8 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using JiShe.CollectBus.Kafka.Consumer;
|
using JiShe.CollectBus.Kafka.Consumer;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
|
||||||
using Microsoft.AspNetCore.Builder;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using System.Reflection;
|
|
||||||
using Volo.Abp;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
using static Confluent.Kafka.ConfigPropertyNames;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
namespace JiShe.CollectBus.Kafka
|
||||||
{
|
{
|
||||||
@ -16,17 +10,6 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
{
|
{
|
||||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||||
{
|
{
|
||||||
// 注册Producer
|
|
||||||
context.Services.AddTransient<IProducerService, ProducerService>();
|
|
||||||
// 注册Consumer
|
|
||||||
context.Services.AddTransient<IConsumerService, ConsumerService>();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
|
||||||
{
|
|
||||||
var app = context.GetApplicationBuilder();
|
|
||||||
app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,205 +6,76 @@ using JiShe.CollectBus.Kafka.Attributes;
|
|||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
using JiShe.CollectBus.Kafka.AdminClient;
|
using JiShe.CollectBus.Kafka.AdminClient;
|
||||||
using static Confluent.Kafka.ConfigPropertyNames;
|
using static Confluent.Kafka.ConfigPropertyNames;
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Text.RegularExpressions;
|
|
||||||
using NUglify.Html;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
namespace JiShe.CollectBus.Kafka.Consumer
|
||||||
{
|
{
|
||||||
public class ConsumerService : IConsumerService, IDisposable
|
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable, ISingletonDependency
|
||||||
{
|
{
|
||||||
private readonly ILogger<ConsumerService> _logger;
|
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
|
||||||
private readonly IConfiguration _configuration;
|
private CancellationTokenSource _cancellationTokenSource;
|
||||||
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
|
|
||||||
_consumerStore = new();
|
|
||||||
|
|
||||||
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
|
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
|
||||||
{
|
{
|
||||||
_configuration = configuration;
|
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
GetInstance(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
#region private 私有方法
|
|
||||||
|
|
||||||
/// <summary>
|
public IConsumer<TKey, TValue> Instance { get; set; } = default;
|
||||||
/// 创建消费者
|
|
||||||
/// </summary>
|
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <returns></returns>
|
|
||||||
private IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(string? groupId = null) where TKey : notnull where TValue : class
|
|
||||||
{
|
{
|
||||||
var config = BuildConsumerConfig(groupId);
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
||||||
return new ConsumerBuilder<TKey, TValue>(config)
|
var consumerConfig = new ConsumerConfig
|
||||||
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
|
|
||||||
.Build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private ConsumerConfig BuildConsumerConfig(string? groupId = null)
|
|
||||||
{
|
|
||||||
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
|
|
||||||
|
|
||||||
var config = new ConsumerConfig
|
|
||||||
{
|
{
|
||||||
BootstrapServers = _configuration["Kafka:BootstrapServers"],
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
GroupId = groupId ?? "default",
|
AutoOffsetReset = AutoOffsetReset.Earliest
|
||||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
|
||||||
EnableAutoCommit = false // 禁止AutoCommit
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (enableAuth)
|
if (enableAuthorization)
|
||||||
{
|
{
|
||||||
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
||||||
config.SaslMechanism = SaslMechanism.Plain;
|
consumerConfig.SaslMechanism = SaslMechanism.Plain;
|
||||||
config.SaslUsername = _configuration["Kafka:SaslUserName"];
|
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
||||||
config.SaslPassword = _configuration["Kafka:SaslPassword"];
|
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
||||||
}
|
}
|
||||||
|
Instance = new ConsumerBuilder<TKey, TValue>(consumerConfig).Build();
|
||||||
return config;
|
return Instance;
|
||||||
}
|
|
||||||
#endregion
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="messageHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
|
||||||
{
|
|
||||||
await SubscribeAsync<TKey, TValue>(new[] { topic }, messageHandler, groupId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler)
|
||||||
/// <summary>
|
|
||||||
/// 订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="messageHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
|
|
||||||
{
|
{
|
||||||
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
|
_cancellationTokenSource = new CancellationTokenSource();
|
||||||
}
|
Instance.Subscribe(topic);
|
||||||
|
|
||||||
/// <summary>
|
try
|
||||||
/// 订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topics"></param>
|
|
||||||
/// <param name="messageHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
|
||||||
{
|
|
||||||
var consumerKey = typeof((TKey, TValue));
|
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
|
||||||
CreateConsumer<TKey, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<TKey, TValue>;
|
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
|
||||||
{
|
{
|
||||||
while (!cts.IsCancellationRequested)
|
while (!_cancellationTokenSource.Token.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
try
|
var result = Instance.Consume(_cancellationTokenSource.Token);
|
||||||
|
if (result != null)
|
||||||
{
|
{
|
||||||
var result = consumer.Consume(cts.Token);
|
await messageHandler(result.Message.Key, result.Message.Value);
|
||||||
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
|
|
||||||
if (sucess)
|
|
||||||
{
|
|
||||||
consumer.Commit(result); // 手动提交
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
await Task.CompletedTask;
|
catch (OperationCanceledException)
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topics"></param>
|
|
||||||
/// <param name="messageHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
|
||||||
{
|
|
||||||
var consumerKey = typeof((Null, TValue));
|
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
|
||||||
(
|
|
||||||
CreateConsumer<Null, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<Null, TValue>;
|
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
|
||||||
{
|
{
|
||||||
while (!cts.IsCancellationRequested)
|
Instance.Close();
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var result = consumer.Consume(cts.Token);
|
|
||||||
bool sucess = await messageHandler(result.Message.Value);
|
|
||||||
if (sucess)
|
|
||||||
consumer.Commit(result); // 手动提交
|
|
||||||
}
|
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 取消消息订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
public void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class
|
|
||||||
{
|
|
||||||
var consumerKey = typeof((TKey, TValue));
|
|
||||||
if (_consumerStore.TryRemove(consumerKey, out var entry))
|
|
||||||
{
|
|
||||||
entry.CTS.Cancel();
|
|
||||||
(entry.Consumer as IDisposable)?.Dispose();
|
|
||||||
entry.CTS.Dispose();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
public void Unsubscribe()
|
||||||
/// 释放资源
|
{
|
||||||
/// </summary>
|
_cancellationTokenSource?.Cancel();
|
||||||
|
Instance?.Unsubscribe();
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
foreach (var entry in _consumerStore.Values)
|
Unsubscribe();
|
||||||
{
|
Instance?.Dispose();
|
||||||
entry.CTS.Cancel();
|
_cancellationTokenSource?.Dispose();
|
||||||
(entry.Consumer as IDisposable)?.Dispose();
|
|
||||||
entry.CTS.Dispose();
|
|
||||||
}
|
|
||||||
_consumerStore.Clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,32 +6,10 @@ using System.Threading.Tasks;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
namespace JiShe.CollectBus.Kafka.Consumer
|
||||||
{
|
{
|
||||||
public interface IConsumerService
|
public interface IConsumerService<TKey, TValue>
|
||||||
{
|
{
|
||||||
Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId=null) where TKey : notnull where TValue : class;
|
Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler);
|
||||||
|
void Unsubscribe();
|
||||||
/// <summary>
|
void Dispose();
|
||||||
/// 订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="messageHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
|
|
||||||
|
|
||||||
Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId) where TKey : notnull where TValue : class;
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topics"></param>
|
|
||||||
/// <param name="messageHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
|
|
||||||
|
|
||||||
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,12 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
|
||||||
{
|
|
||||||
public interface IKafkaSubscribe
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -8,8 +8,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
||||||
<PackageReference Include="Volo.Abp.AspNetCore" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -1,132 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
|
||||||
using JiShe.CollectBus.Kafka.Consumer;
|
|
||||||
using Microsoft.AspNetCore.Builder;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.Hosting;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Microsoft.Extensions.Primitives;
|
|
||||||
using Newtonsoft.Json;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Reflection;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using static Confluent.Kafka.ConfigPropertyNames;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
|
||||||
{
|
|
||||||
public static class KafkaSubcribesExtensions
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 添加Kafka订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="app"></param>
|
|
||||||
/// <param name="assembly"></param>
|
|
||||||
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
|
|
||||||
{
|
|
||||||
var subscribeTypes = assembly.GetTypes()
|
|
||||||
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
|
|
||||||
.ToList();
|
|
||||||
|
|
||||||
if (subscribeTypes.Count == 0) return;
|
|
||||||
|
|
||||||
var provider = app.ApplicationServices;
|
|
||||||
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
|
||||||
|
|
||||||
lifetime.ApplicationStarted.Register(() =>
|
|
||||||
{
|
|
||||||
foreach (var subscribeType in subscribeTypes)
|
|
||||||
{
|
|
||||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
|
||||||
subscribes.ForEach(subscribe => {
|
|
||||||
|
|
||||||
if(subscribe is IKafkaSubscribe)
|
|
||||||
{
|
|
||||||
BuildKafkaSubscriber(subscribe, provider);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 构建Kafka订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="subscribe"></param>
|
|
||||||
/// <param name="provider"></param>
|
|
||||||
private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider)
|
|
||||||
{
|
|
||||||
var subscribedMethods = subscribe.GetType().GetMethods()
|
|
||||||
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
|
||||||
.Where(x => x.Attribute != null)
|
|
||||||
.ToArray();
|
|
||||||
foreach (var sub in subscribedMethods)
|
|
||||||
{
|
|
||||||
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 启动后台消费线程
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="config"></param>
|
|
||||||
/// <param name="attr"></param>
|
|
||||||
/// <param name="method"></param>
|
|
||||||
/// <param name="consumerInstance"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe)
|
|
||||||
{
|
|
||||||
var consumerService = provider.GetRequiredService<IConsumerService>();
|
|
||||||
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
|
||||||
await consumerService.SubscribeAsync<string>(attr.Topics, async (message) =>
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// 处理消息
|
|
||||||
return await ProcessMessageAsync(message, method, subscribe);
|
|
||||||
}
|
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
// 处理消费错误
|
|
||||||
logger.LogError($"kafka消费异常:{ex.Message}");
|
|
||||||
}
|
|
||||||
return await Task.FromResult(false);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 处理消息
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <param name="method"></param>
|
|
||||||
/// <param name="subscribe"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private static async Task<bool> ProcessMessageAsync(string message, MethodInfo method, object subscribe)
|
|
||||||
{
|
|
||||||
var parameters = method.GetParameters();
|
|
||||||
if (parameters.Length != 1)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
var paramType = parameters[0].ParameterType;
|
|
||||||
var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType);
|
|
||||||
|
|
||||||
if (method.ReturnType == typeof(Task))
|
|
||||||
{
|
|
||||||
object? result = await (Task<bool>)method.Invoke(subscribe, new[] { messageObj })!;
|
|
||||||
if (result is bool success)
|
|
||||||
return success;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
object? result = method.Invoke(subscribe, new[] { messageObj });
|
|
||||||
if (result is bool success)
|
|
||||||
return success;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -7,14 +7,10 @@ using System.Threading.Tasks;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Producer
|
namespace JiShe.CollectBus.Kafka.Producer
|
||||||
{
|
{
|
||||||
public interface IProducerService
|
public interface IProducerService<TKey, TValue>
|
||||||
{
|
{
|
||||||
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value) where TKey : notnull where TValue : class;
|
Task ProduceAsync(string topic, TKey key, TValue value);
|
||||||
|
Task ProduceAsync(string topic, TValue value);
|
||||||
Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class;
|
void Dispose();
|
||||||
|
|
||||||
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
|
|
||||||
|
|
||||||
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
@ -12,180 +11,63 @@ using Volo.Abp.DependencyInjection;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Producer
|
namespace JiShe.CollectBus.Kafka.Producer
|
||||||
{
|
{
|
||||||
public class ProducerService: IProducerService, IDisposable
|
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable,ITransientDependency
|
||||||
{
|
{
|
||||||
private readonly ILogger<ProducerService> _logger;
|
|
||||||
private readonly IConfiguration _configuration;
|
|
||||||
private readonly ConcurrentDictionary<Tuple<Type, Type>, object> _producerCache = new();
|
|
||||||
|
|
||||||
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger)
|
private readonly ILogger<ProducerService<TKey, TValue>> _logger;
|
||||||
|
|
||||||
|
protected ProducerService(IConfiguration configuration, ILogger<ProducerService<TKey, TValue>> logger)
|
||||||
{
|
{
|
||||||
_configuration = configuration;
|
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
GetInstance(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
#region private 私有方法
|
|
||||||
/// <summary>
|
public IProducer<TKey, TValue> Instance { get; set; } = default;
|
||||||
/// 创建生产者实例
|
|
||||||
/// </summary>
|
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <returns></returns>
|
|
||||||
private IProducer<TKey, TValue> GetProducer<TKey, TValue>()
|
|
||||||
{
|
{
|
||||||
var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!;
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
||||||
|
var consumerConfig = new ProducerConfig
|
||||||
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
|
|
||||||
{
|
{
|
||||||
var config = BuildProducerConfig();
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
return new ProducerBuilder<TKey, TValue>(config)
|
AllowAutoCreateTopics = true
|
||||||
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
|
|
||||||
.Build();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 配置
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
private ProducerConfig BuildProducerConfig()
|
|
||||||
{
|
|
||||||
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
|
|
||||||
|
|
||||||
var config = new ProducerConfig
|
|
||||||
{
|
|
||||||
BootstrapServers = _configuration["Kafka:BootstrapServers"],
|
|
||||||
AllowAutoCreateTopics = true,
|
|
||||||
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB
|
|
||||||
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd
|
|
||||||
BatchSize = 32_768, // 修改批次大小为32K
|
|
||||||
LingerMs = 20, // 修改等待时间为20ms
|
|
||||||
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
|
|
||||||
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (enableAuth)
|
if (enableAuthorization)
|
||||||
{
|
{
|
||||||
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
||||||
config.SaslMechanism = SaslMechanism.Plain;
|
consumerConfig.SaslMechanism = SaslMechanism.Plain;
|
||||||
config.SaslUsername = _configuration["Kafka:SaslUserName"];
|
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
||||||
config.SaslPassword = _configuration["Kafka:SaslPassword"];
|
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
||||||
}
|
}
|
||||||
|
Instance = new ProducerBuilder<TKey, TValue>(consumerConfig).Build();
|
||||||
return config;
|
return Instance;
|
||||||
}
|
}
|
||||||
|
public async Task ProduceAsync(string topic, TKey key, TValue value)
|
||||||
private static LogLevel ConvertLogLevel(SyslogLevel level) => level switch
|
|
||||||
{
|
|
||||||
SyslogLevel.Emergency => LogLevel.Critical,
|
|
||||||
SyslogLevel.Alert => LogLevel.Critical,
|
|
||||||
SyslogLevel.Critical => LogLevel.Critical,
|
|
||||||
SyslogLevel.Error => LogLevel.Error,
|
|
||||||
SyslogLevel.Warning => LogLevel.Warning,
|
|
||||||
SyslogLevel.Notice => LogLevel.Information,
|
|
||||||
SyslogLevel.Info => LogLevel.Information,
|
|
||||||
SyslogLevel.Debug => LogLevel.Debug,
|
|
||||||
_ => LogLevel.None
|
|
||||||
};
|
|
||||||
|
|
||||||
#endregion
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 发布消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="key"></param>
|
|
||||||
/// <param name="value"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
|
|
||||||
{
|
|
||||||
var producer = GetProducer<TKey, TValue>();
|
|
||||||
await producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 发布消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="value"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
|
||||||
{
|
|
||||||
var producer = GetProducer<Null, TValue>();
|
|
||||||
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value });
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 发布消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey"></typeparam>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="key"></param>
|
|
||||||
/// <param name="value"></param>
|
|
||||||
/// <param name="partition"></param>
|
|
||||||
/// <param name="deliveryHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task ProduceAsync<TKey, TValue>(string topic,TKey key,TValue value,int? partition=null, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)where TKey : notnull where TValue : class
|
|
||||||
{
|
{
|
||||||
var message = new Message<TKey, TValue>
|
var message = new Message<TKey, TValue>
|
||||||
{
|
{
|
||||||
Key = key,
|
Key = key,
|
||||||
Value = value
|
Value = value
|
||||||
};
|
};
|
||||||
var producer = GetProducer<TKey, TValue>();
|
|
||||||
if (partition.HasValue)
|
|
||||||
{
|
|
||||||
var topicPartition = new TopicPartition(topic, partition.Value);
|
|
||||||
producer.Produce(topicPartition, message, deliveryHandler);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
producer.Produce(topic, message, deliveryHandler);
|
|
||||||
}
|
|
||||||
await Task.CompletedTask;
|
|
||||||
|
|
||||||
|
await Instance.ProduceAsync(topic, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
public async Task ProduceAsync(string topic, TValue value)
|
||||||
/// 发布消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TValue"></typeparam>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <param name="key"></param>
|
|
||||||
/// <param name="value"></param>
|
|
||||||
/// <param name="partition"></param>
|
|
||||||
/// <param name="deliveryHandler"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
|
|
||||||
{
|
{
|
||||||
var message = new Message<Null, TValue>
|
var message = new Message<TKey, TValue>
|
||||||
{
|
{
|
||||||
Value = value
|
Value = value
|
||||||
};
|
};
|
||||||
var producer = GetProducer<Null, TValue>();
|
|
||||||
if (partition.HasValue)
|
await Instance.ProduceAsync(topic, message);
|
||||||
{
|
|
||||||
var topicPartition = new TopicPartition(topic, partition.Value);
|
|
||||||
producer.Produce(topicPartition, message, deliveryHandler);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
producer.Produce(topic, message, deliveryHandler);
|
|
||||||
}
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
foreach (var producer in _producerCache.Values.OfType<IDisposable>())
|
Instance?.Dispose();
|
||||||
{
|
|
||||||
producer.Dispose();
|
|
||||||
}
|
|
||||||
_producerCache.Clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user