Compare commits

..

No commits in common. "a57910f4b2304e90642996904c161730fb299efa" and "f2419a70983cb8e3f839bab9fceadf73e33e4720" have entirely different histories.

11 changed files with 247 additions and 195 deletions

View File

@ -81,44 +81,43 @@
"Port": 5672 "Port": 5672
} }
}, },
"Kafka": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"EnableAuthorization": false,
"SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980"
//"Topic": {
// "ReplicationFactor": 3,
// "NumPartitions": 1000
//}
},
//"Kafka": { //"Kafka": {
// "Connections": { // "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
// "Default": { // "EnableAuthorization": false,
// "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092" // "SecurityProtocol": "SASL_PLAINTEXT",
// // "SecurityProtocol": "SASL_PLAINTEXT", // "SaslMechanism": "PLAIN",
// // "SaslMechanism": "PLAIN", // "SaslUserName": "lixiao",
// // "SaslUserName": "lixiao", // "SaslPassword": "lixiao1980",
// // "SaslPassword": "lixiao1980",
// }
// },
// "Consumer": {
// "GroupId": "JiShe.CollectBus"
// },
// "Producer": {
// "MessageTimeoutMs": 6000,
// "Acks": -1
// },
// "Topic": { // "Topic": {
// "ReplicationFactor": 3, // "ReplicationFactor": 3,
// "NumPartitions": 1000 // "NumPartitions": 1000
// },
// "EventBus": {
// "GroupId": "JiShe.CollectBus",
// "TopicName": "DefaultTopicName"
// } // }
//}, "Kafka": {
"Connections": {
"Default": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
// "SecurityProtocol": "SASL_PLAINTEXT",
// "SaslMechanism": "PLAIN",
// "SaslUserName": "lixiao",
// "SaslPassword": "lixiao1980",
}
},
"Consumer": {
"GroupId": "JiShe.CollectBus"
},
"Producer": {
"MessageTimeoutMs": 6000,
"Acks": -1
},
"Topic": {
"ReplicationFactor": 3,
"NumPartitions": 1000
},
"EventBus": {
"GroupId": "JiShe.CollectBus",
"TopicName": "DefaultTopicName"
}
},
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
"Password": "root", "Password": "root",

View File

@ -11,7 +11,7 @@ using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.AdminClient namespace JiShe.CollectBus.Kafka.AdminClient
{ {
public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency public class AdminClientService : IAdminClientService, ISingletonDependency
{ {
private readonly ILogger<AdminClientService> _logger; private readonly ILogger<AdminClientService> _logger;
@ -69,49 +69,43 @@ namespace JiShe.CollectBus.Kafka.AdminClient
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
} }
public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) /// <summary>
/// Creates the topic if not exist asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <param name="factorNum">The factor number.</param>
/// <param name="partitionNum">The partition number.</param>
public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum)
{ {
try try
{ {
if (await CheckTopicAsync(topicName)) return;
await Instance.CreateTopicsAsync(new[] await Instance.CreateTopicsAsync(new[]
{ {
new TopicSpecification new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }
{
Name = topic,
NumPartitions = numPartitions,
ReplicationFactor = replicationFactor
}
}); });
} }
catch (CreateTopicsException e) catch (CreateTopicsException e)
{ {
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) _logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
{
throw;
}
} }
} }
public async Task DeleteTopicAsync(string topic) /// <summary>
/// Deletes the topic asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
public async Task DeleteTopicAsync(List<string> topicName)
{ {
await Instance.DeleteTopicsAsync(new[] { topic }); try
} {
await Instance.DeleteTopicsAsync(topicName, null);
public async Task<List<string>> ListTopicsAsync() }
{ catch (DeleteTopicsException e)
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); {
return new List<string>(metadata.Topics.Select(t => t.Topic)); _logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
} }
public async Task<bool> TopicExistsAsync(string topic)
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Any(t => t.Topic == topic);
}
public void Dispose()
{
Instance?.Dispose();
} }
} }
} }

View File

@ -8,9 +8,27 @@ namespace JiShe.CollectBus.Kafka.AdminClient
{ {
public interface IAdminClientService public interface IAdminClientService
{ {
Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor); /// <summary>
Task DeleteTopicAsync(string topic); /// Checks the topic asynchronous.
Task<List<string>> ListTopicsAsync(); /// </summary>
Task<bool> TopicExistsAsync(string topic); /// <param name="topic">The topic.</param>
/// <returns></returns>
Task<bool> CheckTopicAsync(string topic);
/// <summary>
/// Creates the topic if not exist asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <param name="factorNum">The factor number.</param>
/// <param name="partitionNum">The partition number.</param>
/// <returns></returns>
Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum);
/// <summary>
/// Deletes the topic asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <returns></returns>
Task DeleteTopicAsync(List<string> topicName);
} }
} }

View File

@ -10,6 +10,16 @@ namespace JiShe.CollectBus.Kafka
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
var configuration = context.Services.GetConfiguration();
// 注册 Kafka 生产者
context.Services.AddSingleton<IProducer<string, string>>(sp =>
new ProducerBuilder<string, string>(
configuration.GetSection("Kafka:ProducerConfig").Get<ProducerConfig>()
)
.Build());
// 注册后台服务
context.Services.AddHostedService<ConsumerBackgroundService>();
} }
} }
} }

View File

@ -0,0 +1,54 @@
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer
{
public class ConsumerBackgroundService : BackgroundService
{
private readonly ConsumerService _consumerService;
private readonly ILogger<ConsumerBackgroundService> _logger;
public ConsumerBackgroundService(
ConsumerService consumerService,
ILogger<ConsumerBackgroundService> logger)
{
_consumerService = consumerService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumerService.Subscribe("abp-kafka-topic");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var result = _consumerService.Consume(stoppingToken);
await ProcessMessageAsync(result.Message.Value);
_consumerService.Commit(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"Message consume error: {ex.Error.Reason}");
}
}
}
private async Task ProcessMessageAsync(string message)
{
// 使用 ABP 的异步处理机制
await Task.Run(() =>
{
_logger.LogInformation($"Processing message: {message}");
// 这里可以触发 ABP 的领域事件
});
}
}
}

View File

@ -5,77 +5,54 @@ using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka.Consumer namespace JiShe.CollectBus.Kafka.Consumer
{ {
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable, ISingletonDependency public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency
{ {
private readonly ILogger<ConsumerService<TKey, TValue>> _logger; private readonly ILogger<ConsumerService> _logger;
private CancellationTokenSource _cancellationTokenSource; private readonly IConsumer<string, string> _consumer;
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger) public ConsumerService(
ILogger<ConsumerService> logger,
IConfiguration configuration)
{ {
_logger = logger; _logger = logger;
GetInstance(configuration);
var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig")
.Get<ConsumerConfig>();
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetErrorHandler(OnConsumeError)
.Build();
} }
public void Subscribe(string topic)
public IConsumer<TKey, TValue> Instance { get; set; } = default;
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
{ {
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); _consumer.Subscribe(topic);
var consumerConfig = new ConsumerConfig _logger.LogInformation($"Subscribed to topic: {topic}");
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AutoOffsetReset = AutoOffsetReset.Earliest
};
if (enableAuthorization)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ConsumerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
} }
public async Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler) public ConsumeResult<string, string> Consume(CancellationToken cancellationToken)
{ {
_cancellationTokenSource = new CancellationTokenSource(); return _consumer.Consume(cancellationToken);
Instance.Subscribe(topic);
try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
var result = Instance.Consume(_cancellationTokenSource.Token);
if (result != null)
{
await messageHandler(result.Message.Key, result.Message.Value);
}
}
}
catch (OperationCanceledException)
{
Instance.Close();
}
} }
public void Unsubscribe() public void Commit(ConsumeResult<string, string> result)
{ {
_cancellationTokenSource?.Cancel(); _consumer.Commit(result);
Instance?.Unsubscribe(); _logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}");
}
private void OnConsumeError(IConsumer<string, string> consumer, Error error)
{
_logger.LogError($"Kafka consumer error: {error.Reason}");
} }
public void Dispose() public void Dispose()
{ {
Unsubscribe(); _consumer?.Close();
Instance?.Dispose(); _consumer?.Dispose();
_cancellationTokenSource?.Dispose();
} }
} }
} }

View File

@ -6,10 +6,7 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer namespace JiShe.CollectBus.Kafka.Consumer
{ {
public interface IConsumerService<TKey, TValue> public interface IConsumerService
{ {
Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler);
void Unsubscribe();
void Dispose();
} }
} }

View File

@ -1,17 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class KafkaOptions
{
public string BootstrapServers { get; set; }
public string GroupId { get; set; }
public Dictionary<string, string> ProducerConfig { get; set; } = new();
public Dictionary<string, string> ConsumerConfig { get; set; } = new();
public Dictionary<string, string> AdminConfig { get; set; } = new();
}
}

View File

@ -7,10 +7,8 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Producer namespace JiShe.CollectBus.Kafka.Producer
{ {
public interface IProducerService<TKey, TValue> public interface IProducerService
{ {
Task ProduceAsync(string topic, TKey key, TValue value); Task ProduceAsync(string topic, string message);
Task ProduceAsync(string topic, TValue value);
void Dispose();
} }
} }

View File

@ -0,0 +1,64 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerBaseService<TKey, TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="ProducerBaseService{TKey, TValue}"/> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
public ProducerBaseService(IConfiguration configuration)
{
GetInstance(configuration);
}
/// <summary>
/// Gets or sets the instance.
/// </summary>
/// <value>
/// The instance.
/// </value>
public IProducer<TKey, TValue> Instance { get; set; } = default;
/// <summary>
/// Gets the instance.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <returns></returns>
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var producerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true,
};
if (enableAuthorization)
{
producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
producerConfig.SaslMechanism = SaslMechanism.Plain;
producerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
producerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(producerConfig).Build();
return Instance;
}
/// <summary>
/// Produces the asynchronous.
/// </summary>
/// <param name="topic">The topic.</param>
/// <param name="message">The message.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
{
await Instance.ProduceAsync(topic, message, cancellationToken);
}
}
}

View File

@ -4,70 +4,28 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer namespace JiShe.CollectBus.Kafka.Producer
{ {
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable,ITransientDependency public class ProducerService: IProducerService,ITransientDependency
{ {
private readonly ILogger<ConsumerService<TKey, TValue>> _logger; private readonly IProducer<string, string> _producer;
protected ProducerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger) public ProducerService(IProducer<string, string> producer)
{ {
_logger = logger; _producer = producer;
GetInstance(configuration);
} }
public async Task ProduceAsync(string topic, string message)
public IProducer<TKey, TValue> Instance { get; set; } = default;
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
{ {
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); await _producer.ProduceAsync(topic, new Message<string, string>
var consumerConfig = new ProducerConfig
{ {
BootstrapServers = configuration["Kafka:BootstrapServers"], Key = null,
AllowAutoCreateTopics = true Value = message
}; });
if (enableAuthorization)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
}
public async Task ProduceAsync(string topic, TKey key, TValue value)
{
var message = new Message<TKey, TValue>
{
Key = key,
Value = value
};
await Instance.ProduceAsync(topic, message);
}
public async Task ProduceAsync(string topic, TValue value)
{
var message = new Message<TKey, TValue>
{
Value = value
};
await Instance.ProduceAsync(topic, message);
}
public void Dispose()
{
Instance?.Dispose();
} }
} }
} }