Compare commits

...

2 Commits

11 changed files with 195 additions and 247 deletions

View File

@ -81,43 +81,44 @@
"Port": 5672
}
},
"Kafka": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"EnableAuthorization": false,
"SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980"
//"Topic": {
// "ReplicationFactor": 3,
// "NumPartitions": 1000
//}
},
//"Kafka": {
// "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
// "EnableAuthorization": false,
// "SecurityProtocol": "SASL_PLAINTEXT",
// "SaslMechanism": "PLAIN",
// "SaslUserName": "lixiao",
// "SaslPassword": "lixiao1980",
// "Connections": {
// "Default": {
// "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
// // "SecurityProtocol": "SASL_PLAINTEXT",
// // "SaslMechanism": "PLAIN",
// // "SaslUserName": "lixiao",
// // "SaslPassword": "lixiao1980",
// }
// },
// "Consumer": {
// "GroupId": "JiShe.CollectBus"
// },
// "Producer": {
// "MessageTimeoutMs": 6000,
// "Acks": -1
// },
// "Topic": {
// "ReplicationFactor": 3,
// "NumPartitions": 1000
// },
// "EventBus": {
// "GroupId": "JiShe.CollectBus",
// "TopicName": "DefaultTopicName"
// }
"Kafka": {
"Connections": {
"Default": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
// "SecurityProtocol": "SASL_PLAINTEXT",
// "SaslMechanism": "PLAIN",
// "SaslUserName": "lixiao",
// "SaslPassword": "lixiao1980",
}
},
"Consumer": {
"GroupId": "JiShe.CollectBus"
},
"Producer": {
"MessageTimeoutMs": 6000,
"Acks": -1
},
"Topic": {
"ReplicationFactor": 3,
"NumPartitions": 1000
},
"EventBus": {
"GroupId": "JiShe.CollectBus",
"TopicName": "DefaultTopicName"
}
},
//},
"IoTDBOptions": {
"UserName": "root",
"Password": "root",

View File

@ -11,7 +11,7 @@ using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.AdminClient
{
public class AdminClientService : IAdminClientService, ISingletonDependency
public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency
{
private readonly ILogger<AdminClientService> _logger;
@ -69,43 +69,49 @@ namespace JiShe.CollectBus.Kafka.AdminClient
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
}
/// <summary>
/// Creates the topic if not exist asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <param name="factorNum">The factor number.</param>
/// <param name="partitionNum">The partition number.</param>
public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum)
public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor)
{
try
{
if (await CheckTopicAsync(topicName)) return;
await Instance.CreateTopicsAsync(new[]
{
new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }
new TopicSpecification
{
Name = topic,
NumPartitions = numPartitions,
ReplicationFactor = replicationFactor
}
});
}
catch (CreateTopicsException e)
{
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
throw;
}
}
}
/// <summary>
/// Deletes the topic asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
public async Task DeleteTopicAsync(List<string> topicName)
public async Task DeleteTopicAsync(string topic)
{
try
{
await Instance.DeleteTopicsAsync(topicName, null);
}
catch (DeleteTopicsException e)
{
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
await Instance.DeleteTopicsAsync(new[] { topic });
}
public async Task<List<string>> ListTopicsAsync()
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return new List<string>(metadata.Topics.Select(t => t.Topic));
}
public async Task<bool> TopicExistsAsync(string topic)
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Any(t => t.Topic == topic);
}
public void Dispose()
{
Instance?.Dispose();
}
}
}

View File

@ -8,27 +8,9 @@ namespace JiShe.CollectBus.Kafka.AdminClient
{
public interface IAdminClientService
{
/// <summary>
/// Checks the topic asynchronous.
/// </summary>
/// <param name="topic">The topic.</param>
/// <returns></returns>
Task<bool> CheckTopicAsync(string topic);
/// <summary>
/// Creates the topic if not exist asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <param name="factorNum">The factor number.</param>
/// <param name="partitionNum">The partition number.</param>
/// <returns></returns>
Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum);
/// <summary>
/// Deletes the topic asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <returns></returns>
Task DeleteTopicAsync(List<string> topicName);
Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor);
Task DeleteTopicAsync(string topic);
Task<List<string>> ListTopicsAsync();
Task<bool> TopicExistsAsync(string topic);
}
}

View File

@ -10,16 +10,6 @@ namespace JiShe.CollectBus.Kafka
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
// 注册 Kafka 生产者
context.Services.AddSingleton<IProducer<string, string>>(sp =>
new ProducerBuilder<string, string>(
configuration.GetSection("Kafka:ProducerConfig").Get<ProducerConfig>()
)
.Build());
// 注册后台服务
context.Services.AddHostedService<ConsumerBackgroundService>();
}
}
}

View File

@ -1,54 +0,0 @@
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer
{
public class ConsumerBackgroundService : BackgroundService
{
private readonly ConsumerService _consumerService;
private readonly ILogger<ConsumerBackgroundService> _logger;
public ConsumerBackgroundService(
ConsumerService consumerService,
ILogger<ConsumerBackgroundService> logger)
{
_consumerService = consumerService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumerService.Subscribe("abp-kafka-topic");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var result = _consumerService.Consume(stoppingToken);
await ProcessMessageAsync(result.Message.Value);
_consumerService.Commit(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"Message consume error: {ex.Error.Reason}");
}
}
}
private async Task ProcessMessageAsync(string message)
{
// 使用 ABP 的异步处理机制
await Task.Run(() =>
{
_logger.LogInformation($"Processing message: {message}");
// 这里可以触发 ABP 的领域事件
});
}
}
}

View File

@ -5,54 +5,77 @@ using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka.Consumer
{
public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable, ISingletonDependency
{
private readonly ILogger<ConsumerService> _logger;
private readonly IConsumer<string, string> _consumer;
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
private CancellationTokenSource _cancellationTokenSource;
public ConsumerService(
ILogger<ConsumerService> logger,
IConfiguration configuration)
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
{
_logger = logger;
var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig")
.Get<ConsumerConfig>();
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetErrorHandler(OnConsumeError)
.Build();
GetInstance(configuration);
}
public void Subscribe(string topic)
public IConsumer<TKey, TValue> Instance { get; set; } = default;
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
{
_consumer.Subscribe(topic);
_logger.LogInformation($"Subscribed to topic: {topic}");
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AutoOffsetReset = AutoOffsetReset.Earliest
};
if (enableAuthorization)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ConsumerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
}
public ConsumeResult<string, string> Consume(CancellationToken cancellationToken)
public async Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler)
{
return _consumer.Consume(cancellationToken);
_cancellationTokenSource = new CancellationTokenSource();
Instance.Subscribe(topic);
try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
var result = Instance.Consume(_cancellationTokenSource.Token);
if (result != null)
{
await messageHandler(result.Message.Key, result.Message.Value);
}
}
}
catch (OperationCanceledException)
{
Instance.Close();
}
}
public void Commit(ConsumeResult<string, string> result)
public void Unsubscribe()
{
_consumer.Commit(result);
_logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}");
}
private void OnConsumeError(IConsumer<string, string> consumer, Error error)
{
_logger.LogError($"Kafka consumer error: {error.Reason}");
_cancellationTokenSource?.Cancel();
Instance?.Unsubscribe();
}
public void Dispose()
{
_consumer?.Close();
_consumer?.Dispose();
Unsubscribe();
Instance?.Dispose();
_cancellationTokenSource?.Dispose();
}
}
}

View File

@ -6,7 +6,10 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer
{
public interface IConsumerService
public interface IConsumerService<TKey, TValue>
{
Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler);
void Unsubscribe();
void Dispose();
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class KafkaOptions
{
public string BootstrapServers { get; set; }
public string GroupId { get; set; }
public Dictionary<string, string> ProducerConfig { get; set; } = new();
public Dictionary<string, string> ConsumerConfig { get; set; } = new();
public Dictionary<string, string> AdminConfig { get; set; } = new();
}
}

View File

@ -7,8 +7,10 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Producer
{
public interface IProducerService
public interface IProducerService<TKey, TValue>
{
Task ProduceAsync(string topic, string message);
Task ProduceAsync(string topic, TKey key, TValue value);
Task ProduceAsync(string topic, TValue value);
void Dispose();
}
}

View File

@ -1,64 +0,0 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerBaseService<TKey, TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="ProducerBaseService{TKey, TValue}"/> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
public ProducerBaseService(IConfiguration configuration)
{
GetInstance(configuration);
}
/// <summary>
/// Gets or sets the instance.
/// </summary>
/// <value>
/// The instance.
/// </value>
public IProducer<TKey, TValue> Instance { get; set; } = default;
/// <summary>
/// Gets the instance.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <returns></returns>
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var producerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true,
};
if (enableAuthorization)
{
producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
producerConfig.SaslMechanism = SaslMechanism.Plain;
producerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
producerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(producerConfig).Build();
return Instance;
}
/// <summary>
/// Produces the asynchronous.
/// </summary>
/// <param name="topic">The topic.</param>
/// <param name="message">The message.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
{
await Instance.ProduceAsync(topic, message, cancellationToken);
}
}
}

View File

@ -4,28 +4,70 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerService: IProducerService,ITransientDependency
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable,ITransientDependency
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
public ProducerService(IProducer<string, string> producer)
protected ProducerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
{
_producer = producer;
_logger = logger;
GetInstance(configuration);
}
public async Task ProduceAsync(string topic, string message)
public IProducer<TKey, TValue> Instance { get; set; } = default;
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
{
await _producer.ProduceAsync(topic, new Message<string, string>
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var consumerConfig = new ProducerConfig
{
Key = null,
Value = message
});
BootstrapServers = configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true
};
if (enableAuthorization)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
}
public async Task ProduceAsync(string topic, TKey key, TValue value)
{
var message = new Message<TKey, TValue>
{
Key = key,
Value = value
};
await Instance.ProduceAsync(topic, message);
}
public async Task ProduceAsync(string topic, TValue value)
{
var message = new Message<TKey, TValue>
{
Value = value
};
await Instance.ProduceAsync(topic, message);
}
public void Dispose()
{
Instance?.Dispose();
}
}
}