dev #2
@ -81,43 +81,44 @@
|
||||
"Port": 5672
|
||||
}
|
||||
},
|
||||
"Kafka": {
|
||||
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
|
||||
"EnableAuthorization": false,
|
||||
"SecurityProtocol": "SASL_PLAINTEXT",
|
||||
"SaslMechanism": "PLAIN",
|
||||
"SaslUserName": "lixiao",
|
||||
"SaslPassword": "lixiao1980"
|
||||
//"Topic": {
|
||||
// "ReplicationFactor": 3,
|
||||
// "NumPartitions": 1000
|
||||
//}
|
||||
},
|
||||
//"Kafka": {
|
||||
// "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
|
||||
// "EnableAuthorization": false,
|
||||
// "SecurityProtocol": "SASL_PLAINTEXT",
|
||||
// "SaslMechanism": "PLAIN",
|
||||
// "SaslUserName": "lixiao",
|
||||
// "SaslPassword": "lixiao1980",
|
||||
// "Connections": {
|
||||
// "Default": {
|
||||
// "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
|
||||
// // "SecurityProtocol": "SASL_PLAINTEXT",
|
||||
// // "SaslMechanism": "PLAIN",
|
||||
// // "SaslUserName": "lixiao",
|
||||
// // "SaslPassword": "lixiao1980",
|
||||
// }
|
||||
// },
|
||||
// "Consumer": {
|
||||
// "GroupId": "JiShe.CollectBus"
|
||||
// },
|
||||
// "Producer": {
|
||||
// "MessageTimeoutMs": 6000,
|
||||
// "Acks": -1
|
||||
// },
|
||||
// "Topic": {
|
||||
// "ReplicationFactor": 3,
|
||||
// "NumPartitions": 1000
|
||||
// },
|
||||
// "EventBus": {
|
||||
// "GroupId": "JiShe.CollectBus",
|
||||
// "TopicName": "DefaultTopicName"
|
||||
// }
|
||||
"Kafka": {
|
||||
"Connections": {
|
||||
"Default": {
|
||||
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
|
||||
// "SecurityProtocol": "SASL_PLAINTEXT",
|
||||
// "SaslMechanism": "PLAIN",
|
||||
// "SaslUserName": "lixiao",
|
||||
// "SaslPassword": "lixiao1980",
|
||||
}
|
||||
},
|
||||
"Consumer": {
|
||||
"GroupId": "JiShe.CollectBus"
|
||||
},
|
||||
"Producer": {
|
||||
"MessageTimeoutMs": 6000,
|
||||
"Acks": -1
|
||||
},
|
||||
"Topic": {
|
||||
"ReplicationFactor": 3,
|
||||
"NumPartitions": 1000
|
||||
},
|
||||
"EventBus": {
|
||||
"GroupId": "JiShe.CollectBus",
|
||||
"TopicName": "DefaultTopicName"
|
||||
}
|
||||
},
|
||||
//},
|
||||
"IoTDBOptions": {
|
||||
"UserName": "root",
|
||||
"Password": "root",
|
||||
|
||||
@ -11,7 +11,7 @@ using Volo.Abp.DependencyInjection;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka.AdminClient
|
||||
{
|
||||
public class AdminClientService : IAdminClientService, ISingletonDependency
|
||||
public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency
|
||||
{
|
||||
|
||||
private readonly ILogger<AdminClientService> _logger;
|
||||
@ -69,43 +69,49 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
||||
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates the topic if not exist asynchronous.
|
||||
/// </summary>
|
||||
/// <param name="topicName">Name of the topic.</param>
|
||||
/// <param name="factorNum">The factor number.</param>
|
||||
/// <param name="partitionNum">The partition number.</param>
|
||||
public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum)
|
||||
public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (await CheckTopicAsync(topicName)) return;
|
||||
|
||||
await Instance.CreateTopicsAsync(new[]
|
||||
{
|
||||
new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }
|
||||
new TopicSpecification
|
||||
{
|
||||
Name = topic,
|
||||
NumPartitions = numPartitions,
|
||||
ReplicationFactor = replicationFactor
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (CreateTopicsException e)
|
||||
{
|
||||
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
|
||||
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Deletes the topic asynchronous.
|
||||
/// </summary>
|
||||
/// <param name="topicName">Name of the topic.</param>
|
||||
public async Task DeleteTopicAsync(List<string> topicName)
|
||||
public async Task DeleteTopicAsync(string topic)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Instance.DeleteTopicsAsync(topicName, null);
|
||||
await Instance.DeleteTopicsAsync(new[] { topic });
|
||||
}
|
||||
catch (DeleteTopicsException e)
|
||||
|
||||
public async Task<List<string>> ListTopicsAsync()
|
||||
{
|
||||
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
|
||||
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
||||
return new List<string>(metadata.Topics.Select(t => t.Topic));
|
||||
}
|
||||
|
||||
public async Task<bool> TopicExistsAsync(string topic)
|
||||
{
|
||||
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
||||
return metadata.Topics.Any(t => t.Topic == topic);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Instance?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,27 +8,9 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
||||
{
|
||||
public interface IAdminClientService
|
||||
{
|
||||
/// <summary>
|
||||
/// Checks the topic asynchronous.
|
||||
/// </summary>
|
||||
/// <param name="topic">The topic.</param>
|
||||
/// <returns></returns>
|
||||
Task<bool> CheckTopicAsync(string topic);
|
||||
|
||||
/// <summary>
|
||||
/// Creates the topic if not exist asynchronous.
|
||||
/// </summary>
|
||||
/// <param name="topicName">Name of the topic.</param>
|
||||
/// <param name="factorNum">The factor number.</param>
|
||||
/// <param name="partitionNum">The partition number.</param>
|
||||
/// <returns></returns>
|
||||
Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum);
|
||||
|
||||
/// <summary>
|
||||
/// Deletes the topic asynchronous.
|
||||
/// </summary>
|
||||
/// <param name="topicName">Name of the topic.</param>
|
||||
/// <returns></returns>
|
||||
Task DeleteTopicAsync(List<string> topicName);
|
||||
Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor);
|
||||
Task DeleteTopicAsync(string topic);
|
||||
Task<List<string>> ListTopicsAsync();
|
||||
Task<bool> TopicExistsAsync(string topic);
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,16 +10,6 @@ namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||
{
|
||||
var configuration = context.Services.GetConfiguration();
|
||||
|
||||
// 注册 Kafka 生产者
|
||||
context.Services.AddSingleton<IProducer<string, string>>(sp =>
|
||||
new ProducerBuilder<string, string>(
|
||||
configuration.GetSection("Kafka:ProducerConfig").Get<ProducerConfig>()
|
||||
)
|
||||
.Build());
|
||||
// 注册后台服务
|
||||
context.Services.AddHostedService<ConsumerBackgroundService>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,54 +0,0 @@
|
||||
using Confluent.Kafka;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka.Consumer
|
||||
{
|
||||
public class ConsumerBackgroundService : BackgroundService
|
||||
{
|
||||
private readonly ConsumerService _consumerService;
|
||||
private readonly ILogger<ConsumerBackgroundService> _logger;
|
||||
|
||||
public ConsumerBackgroundService(
|
||||
ConsumerService consumerService,
|
||||
ILogger<ConsumerBackgroundService> logger)
|
||||
{
|
||||
_consumerService = consumerService;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_consumerService.Subscribe("abp-kafka-topic");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = _consumerService.Consume(stoppingToken);
|
||||
await ProcessMessageAsync(result.Message.Value);
|
||||
_consumerService.Commit(result);
|
||||
}
|
||||
catch (ConsumeException ex)
|
||||
{
|
||||
_logger.LogError(ex, $"Message consume error: {ex.Error.Reason}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessMessageAsync(string message)
|
||||
{
|
||||
// 使用 ABP 的异步处理机制
|
||||
await Task.Run(() =>
|
||||
{
|
||||
_logger.LogInformation($"Processing message: {message}");
|
||||
// 这里可以触发 ABP 的领域事件
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5,54 +5,77 @@ using Microsoft.Extensions.Logging;
|
||||
using JiShe.CollectBus.Kafka.Attributes;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
using JiShe.CollectBus.Kafka.AdminClient;
|
||||
using static Confluent.Kafka.ConfigPropertyNames;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka.Consumer
|
||||
{
|
||||
public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency
|
||||
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable, ISingletonDependency
|
||||
{
|
||||
private readonly ILogger<ConsumerService> _logger;
|
||||
private readonly IConsumer<string, string> _consumer;
|
||||
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
|
||||
private CancellationTokenSource _cancellationTokenSource;
|
||||
|
||||
public ConsumerService(
|
||||
ILogger<ConsumerService> logger,
|
||||
IConfiguration configuration)
|
||||
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
|
||||
var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig")
|
||||
.Get<ConsumerConfig>();
|
||||
|
||||
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
|
||||
.SetErrorHandler(OnConsumeError)
|
||||
.Build();
|
||||
GetInstance(configuration);
|
||||
}
|
||||
|
||||
public void Subscribe(string topic)
|
||||
|
||||
public IConsumer<TKey, TValue> Instance { get; set; } = default;
|
||||
|
||||
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
|
||||
{
|
||||
_consumer.Subscribe(topic);
|
||||
_logger.LogInformation($"Subscribed to topic: {topic}");
|
||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
||||
var consumerConfig = new ConsumerConfig
|
||||
{
|
||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||
AutoOffsetReset = AutoOffsetReset.Earliest
|
||||
};
|
||||
|
||||
if (enableAuthorization)
|
||||
{
|
||||
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
||||
consumerConfig.SaslMechanism = SaslMechanism.Plain;
|
||||
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
||||
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
||||
}
|
||||
Instance = new ConsumerBuilder<TKey, TValue>(consumerConfig).Build();
|
||||
return Instance;
|
||||
}
|
||||
|
||||
public ConsumeResult<string, string> Consume(CancellationToken cancellationToken)
|
||||
public async Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler)
|
||||
{
|
||||
return _consumer.Consume(cancellationToken);
|
||||
_cancellationTokenSource = new CancellationTokenSource();
|
||||
Instance.Subscribe(topic);
|
||||
|
||||
try
|
||||
{
|
||||
while (!_cancellationTokenSource.Token.IsCancellationRequested)
|
||||
{
|
||||
var result = Instance.Consume(_cancellationTokenSource.Token);
|
||||
if (result != null)
|
||||
{
|
||||
await messageHandler(result.Message.Key, result.Message.Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Instance.Close();
|
||||
}
|
||||
}
|
||||
|
||||
public void Commit(ConsumeResult<string, string> result)
|
||||
public void Unsubscribe()
|
||||
{
|
||||
_consumer.Commit(result);
|
||||
_logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}");
|
||||
}
|
||||
|
||||
private void OnConsumeError(IConsumer<string, string> consumer, Error error)
|
||||
{
|
||||
_logger.LogError($"Kafka consumer error: {error.Reason}");
|
||||
_cancellationTokenSource?.Cancel();
|
||||
Instance?.Unsubscribe();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_consumer?.Close();
|
||||
_consumer?.Dispose();
|
||||
Unsubscribe();
|
||||
Instance?.Dispose();
|
||||
_cancellationTokenSource?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,7 +6,10 @@ using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka.Consumer
|
||||
{
|
||||
public interface IConsumerService
|
||||
public interface IConsumerService<TKey, TValue>
|
||||
{
|
||||
Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler);
|
||||
void Unsubscribe();
|
||||
void Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
17
src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs
Normal file
17
src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs
Normal file
@ -0,0 +1,17 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
public class KafkaOptions
|
||||
{
|
||||
public string BootstrapServers { get; set; }
|
||||
public string GroupId { get; set; }
|
||||
public Dictionary<string, string> ProducerConfig { get; set; } = new();
|
||||
public Dictionary<string, string> ConsumerConfig { get; set; } = new();
|
||||
public Dictionary<string, string> AdminConfig { get; set; } = new();
|
||||
}
|
||||
}
|
||||
@ -7,8 +7,10 @@ using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka.Producer
|
||||
{
|
||||
public interface IProducerService
|
||||
public interface IProducerService<TKey, TValue>
|
||||
{
|
||||
Task ProduceAsync(string topic, string message);
|
||||
Task ProduceAsync(string topic, TKey key, TValue value);
|
||||
Task ProduceAsync(string topic, TValue value);
|
||||
void Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,64 +0,0 @@
|
||||
using Confluent.Kafka;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka.Producer
|
||||
{
|
||||
public class ProducerBaseService<TKey, TValue>
|
||||
{
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="ProducerBaseService{TKey, TValue}"/> class.
|
||||
/// </summary>
|
||||
/// <param name="configuration">The configuration.</param>
|
||||
public ProducerBaseService(IConfiguration configuration)
|
||||
{
|
||||
GetInstance(configuration);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the instance.
|
||||
/// </summary>
|
||||
/// <value>
|
||||
/// The instance.
|
||||
/// </value>
|
||||
public IProducer<TKey, TValue> Instance { get; set; } = default;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the instance.
|
||||
/// </summary>
|
||||
/// <param name="configuration">The configuration.</param>
|
||||
/// <returns></returns>
|
||||
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
|
||||
{
|
||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
||||
var producerConfig = new ProducerConfig
|
||||
{
|
||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||
AllowAutoCreateTopics = true,
|
||||
};
|
||||
|
||||
if (enableAuthorization)
|
||||
{
|
||||
producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
||||
producerConfig.SaslMechanism = SaslMechanism.Plain;
|
||||
producerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
||||
producerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
||||
}
|
||||
Instance = new ProducerBuilder<TKey, TValue>(producerConfig).Build();
|
||||
return Instance;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Produces the asynchronous.
|
||||
/// </summary>
|
||||
/// <param name="topic">The topic.</param>
|
||||
/// <param name="message">The message.</param>
|
||||
/// <param name="cancellationToken">The cancellation token.</param>
|
||||
public async Task ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await Instance.ProduceAsync(topic, message, cancellationToken);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@ -4,28 +4,70 @@ using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Confluent.Kafka;
|
||||
using JiShe.CollectBus.Kafka.Consumer;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka.Producer
|
||||
{
|
||||
public class ProducerService: IProducerService,ITransientDependency
|
||||
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable,ITransientDependency
|
||||
{
|
||||
|
||||
private readonly IProducer<string, string> _producer;
|
||||
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
|
||||
|
||||
public ProducerService(IProducer<string, string> producer)
|
||||
protected ProducerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
|
||||
{
|
||||
_producer = producer;
|
||||
_logger = logger;
|
||||
GetInstance(configuration);
|
||||
}
|
||||
|
||||
public async Task ProduceAsync(string topic, string message)
|
||||
|
||||
public IProducer<TKey, TValue> Instance { get; set; } = default;
|
||||
|
||||
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
|
||||
{
|
||||
await _producer.ProduceAsync(topic, new Message<string, string>
|
||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
||||
var consumerConfig = new ProducerConfig
|
||||
{
|
||||
Key = null,
|
||||
Value = message
|
||||
});
|
||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||
AllowAutoCreateTopics = true
|
||||
};
|
||||
|
||||
if (enableAuthorization)
|
||||
{
|
||||
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
||||
consumerConfig.SaslMechanism = SaslMechanism.Plain;
|
||||
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
||||
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
||||
}
|
||||
Instance = new ProducerBuilder<TKey, TValue>(consumerConfig).Build();
|
||||
return Instance;
|
||||
}
|
||||
public async Task ProduceAsync(string topic, TKey key, TValue value)
|
||||
{
|
||||
var message = new Message<TKey, TValue>
|
||||
{
|
||||
Key = key,
|
||||
Value = value
|
||||
};
|
||||
|
||||
await Instance.ProduceAsync(topic, message);
|
||||
}
|
||||
|
||||
public async Task ProduceAsync(string topic, TValue value)
|
||||
{
|
||||
var message = new Message<TKey, TValue>
|
||||
{
|
||||
Value = value
|
||||
};
|
||||
|
||||
await Instance.ProduceAsync(topic, message);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Instance?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user