kafka整改

This commit is contained in:
cli 2025-04-09 14:33:20 +08:00
parent 000e6e627e
commit 149f78278d
22 changed files with 863 additions and 111 deletions

View File

@ -31,8 +31,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{919F4CDB-5C82-4371-B209-403B408DA248}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -91,10 +89,6 @@ Global
{C06C4082-638F-2996-5FED-7784475766C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.Build.0 = Release|Any CPU
{919F4CDB-5C82-4371-B209-403B408DA248}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{919F4CDB-5C82-4371-B209-403B408DA248}.Debug|Any CPU.Build.0 = Debug|Any CPU
{919F4CDB-5C82-4371-B209-403B408DA248}.Release|Any CPU.ActiveCfg = Release|Any CPU
{919F4CDB-5C82-4371-B209-403B408DA248}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -113,7 +107,6 @@ Global
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{919F4CDB-5C82-4371-B209-403B408DA248} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -1,6 +1,18 @@
namespace JiShe.CollectBus.Samples;
using JiShe.CollectBus.Common.Attributes;
using Volo.Abp.EventBus;
namespace JiShe.CollectBus.Samples;
[EventName("Sample.Kafka.Test")]
[TopicName("Test1")]
public class SampleDto
{
public int Value { get; set; }
}
[EventName("Sample.Kafka.Test2")]
[TopicName("Test2")]
public class SampleDto2
{
public int Value { get; set; }
}

View File

@ -16,8 +16,11 @@ using JiShe.CollectBus.Workers;
using Volo.Abp.BackgroundWorkers.Hangfire;
using JiShe.CollectBus.MongoDB;
using JiShe.CollectBus.ScheduledMeterReading;
using AutoMapper.Configuration.Annotations;
using JiShe.CollectBus.Common.Attributes;
using Volo.Abp.EventBus.Kafka;
using Volo.Abp.Kafka;
using Microsoft.Extensions.Configuration;
using Volo.Abp.EventBus;
using Confluent.Kafka;
namespace JiShe.CollectBus;
@ -28,35 +31,65 @@ namespace JiShe.CollectBus;
typeof(AbpAutoMapperModule),
typeof(AbpBackgroundWorkersHangfireModule),
typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule)
typeof(CollectBusFreeSqlModule),
typeof(AbpEventBusModule),
typeof(AbpKafkaModule)
)]
public class CollectBusApplicationModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
Configure<AbpAutoMapperOptions>(options =>
{
options.AddMaps<CollectBusApplicationModule>(validate: true);
});
//Configure<AbpAntiForgeryOptions>(options =>
//{
// options.TokenCookie.Expiration = TimeSpan.FromDays(365);
// options.AutoValidateIgnoredHttpMethods.Add("POST");
//});
Configure<AbpKafkaOptions>(configuration.GetSection("Kafka"));
Configure<AbpKafkaEventBusOptions>(configuration.GetSection("Kafka:EventBus"));
Configure<AbpKafkaOptions>(options =>
{
options.ConfigureConsumer = config =>
{
config.GroupId = configuration.GetValue<string>("Kafka:Consumer:GroupId");
config.EnableAutoCommit = configuration.GetValue<bool>("Kafka:Consumer:EnableAutoCommit");
};
});
Configure<AbpKafkaOptions>(options =>
{
options.ConfigureProducer = config =>
{
config.MessageTimeoutMs = configuration.GetValue<int>("Kafka:Producer:MessageTimeoutMs");
config.Acks = (Acks)configuration.GetValue<int>("Kafka:Producer:Acks");
};
});
Configure<AbpKafkaOptions>(options =>
{
options.ConfigureTopic = specification =>
{
specification.ReplicationFactor = configuration.GetValue<short>("Kafka:Topic:ReplicationFactor");
specification.NumPartitions = configuration.GetValue<int>("Kafka:Topic:NumPartitions");
};
});
}
public override void OnApplicationInitialization(
ApplicationInitializationContext context)
{
context.ServiceProvider.GetRequiredService<CustomKafkaDistributedEventBus>().Initialize();
var assembly = Assembly.GetExecutingAssembly();
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
foreach (var type in types)
{
context.AddBackgroundWorkerAsync(type);
}
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
//默认初始化表计信息

View File

@ -0,0 +1,250 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Confluent.Kafka;
using JiShe.CollectBus.Common.Attributes;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Kafka;
using Volo.Abp.EventBus.Local;
using Volo.Abp.Guids;
using Volo.Abp.Kafka;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Timing;
using Volo.Abp.Tracing;
using Volo.Abp.Uow;
using Volo.Abp.Threading;
namespace JiShe.CollectBus;
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(CustomKafkaDistributedEventBus))]
public class CustomKafkaDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; }
protected IKafkaSerializer Serializer { get; }
protected IProducerPool ProducerPool { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IKafkaMessageConsumer Consumer { get; private set; } = default!;
public CustomKafkaDistributedEventBus(IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider,
IOptions<AbpKafkaEventBusOptions> abpKafkaEventBusOptions,
IKafkaMessageConsumerFactory messageConsumerFactory,
IKafkaSerializer serializer,
IProducerPool producerPool)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, abpDistributedEventBusOptions, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider)
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
MessageConsumerFactory = messageConsumerFactory;
Serializer = serializer;
ProducerPool = producerPool;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
}
public void Initialize()
{
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));
GetOrCreateHandlerFactories(typeof(TEvent))
.Locking(factories =>
{
factories.RemoveAll(
factory =>
{
var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
if (singleInstanceFactory == null)
{
return false;
}
var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>;
if (actionHandler == null)
{
return false;
}
return actionHandler.Action == action;
});
});
}
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory handlerFactory &&
handlerFactory.HandlerInstance == handler
);
});
}
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
}
public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
var headers = new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(Guid.NewGuid().ToString("N")) }
};
if (CorrelationIdProvider.Get() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(CorrelationIdProvider.Get()!));
}
var topicAttribute = eventType.GetCustomAttribute<TopicNameAttribute>();
await PublishAsync(
topicAttribute==null?AbpKafkaEventBusOptions.TopicName:topicAttribute.Name,
eventType,
eventData,
headers
);
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
{
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();
}
public override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
throw new NotImplementedException();
}
public override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
throw new NotImplementedException();
}
public override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
throw new NotImplementedException();
}
protected override byte[] Serialize(object eventData)
{
return Serializer.Serialize(eventData);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
eventType,
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes.GetOrAdd(eventName, eventType);
return new List<IEventHandlerFactory>();
}
);
}
private Task PublishAsync(string topicName, Type eventType, object eventData, Headers headers)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
return PublishAsync(topicName, eventName, body, headers);
}
private Task<DeliveryResult<string, byte[]>> PublishAsync(
string topicName,
string eventName,
byte[] body,
Headers headers)
{
var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
return producer.ProduceAsync(
topicName,
new Message<string, byte[]>
{
Key = eventName,
Value = body,
Headers = headers
});
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}
//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}
return false;
}
}

View File

@ -0,0 +1,46 @@
using JiShe.CollectBus.Samples;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace JiShe.CollectBus.Handlers
{
public class SampleHandler : IDistributedEventHandler<SampleDto>,
ITransientDependency
{
private readonly ILogger<SampleHandler> _logger;
public SampleHandler(ILogger<SampleHandler> logger)
{
_logger = logger;
}
public async Task HandleEventAsync(SampleDto eventData)
{
_logger.LogWarning($"topic Test1 message: {eventData.Value.ToString()}");
}
}
public class SampleHandler2 : IDistributedEventHandler<SampleDto2>,
ITransientDependency
{
private readonly ILogger<SampleHandler2> _logger;
public SampleHandler2(ILogger<SampleHandler2> logger)
{
_logger = logger;
}
public async Task HandleEventAsync(SampleDto2 eventData)
{
_logger.LogWarning($"topic Test2 message: {eventData.Value.ToString()}");
}
}
}

View File

@ -23,6 +23,7 @@
<PackageReference Include="TouchSocket" Version="2.1.9" />
<PackageReference Include="TouchSocket.Hosting" Version="2.1.9" />
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />

View File

@ -3,11 +3,21 @@ using System.Threading.Tasks;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IotSystems.PrepayModel;
using Microsoft.AspNetCore.Authorization;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.Kafka;
namespace JiShe.CollectBus.Samples;
public class SampleAppService : CollectBusAppService, ISampleAppService
{
private readonly IDistributedEventBus _distributedEventBus;
public SampleAppService(IDistributedEventBus distributedEventBus)
{
_distributedEventBus = distributedEventBus;
}
public Task<SampleDto> GetAsync()
{
return Task.FromResult(
@ -36,4 +46,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
var ammeterList = await SqlProvider.Instance.Change(DbEnum.PrepayDB).Select<Vi_BaseAmmeterInfo>().Where(d => d.TB_CustomerID == 5).Take(10).ToListAsync();
return ammeterList;
}
[AllowAnonymous]
public async Task KafkaSendTest()
{
await _distributedEventBus.PublishAsync(
new SampleDto
{
Value = 123456,
}
);
await _distributedEventBus.PublishAsync(
new SampleDto2
{
Value = 456789,
}
);
}
}

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.EventBus;
using Volo.Abp;
namespace JiShe.CollectBus.Common.Attributes
{
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class TopicNameAttribute : Attribute
{
public virtual string Name { get; }
public TopicNameAttribute(string name)
{
this.Name = Check.NotNullOrWhiteSpace(name, nameof(name));
}
public string GetName(Type eventType) => this.Name;
}
}

View File

@ -41,7 +41,8 @@
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
//"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"Configuration": "127.0.0.1:6379,password=123456@qwer,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "14",
"HangfireDB": "15"
@ -83,11 +84,41 @@
"Port": 5672
}
},
//"Kafka": {
// "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
// "EnableAuthorization": false,
// "SecurityProtocol": "SASL_PLAINTEXT",
// "SaslMechanism": "PLAIN",
// "SaslUserName": "lixiao",
// "SaslPassword": "lixiao1980",
// "Topic": {
// "ReplicationFactor": 3,
// "NumPartitions": 1000
// }
"Kafka": {
"EnableAuthorization": false,
"SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980"
"Connections": {
"Default": {
"BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092"
// "SecurityProtocol": "SASL_PLAINTEXT",
// "SaslMechanism": "PLAIN",
// "SaslUserName": "lixiao",
// "SaslPassword": "lixiao1980",
}
},
"Consumer": {
"GroupId": "JiShe.CollectBus"
},
"Producer": {
"MessageTimeoutMs": 6000,
"Acks": -1
},
"Topic": {
"ReplicationFactor": 3,
"NumPartitions": 1000
},
"EventBus": {
"GroupId": "JiShe.CollectBus",
"TopicName": "DefaultTopicName"
}
}
}

View File

@ -0,0 +1,111 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.AdminClient
{
public class AdminClientService : IAdminClientService, ISingletonDependency
{
private readonly ILogger<AdminClientService> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="AdminClientService"/> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <param name="logger">The logger.</param>
public AdminClientService(IConfiguration configuration, ILogger<AdminClientService> logger)
{
_logger = logger;
GetInstance(configuration);
}
/// <summary>
/// Gets or sets the instance.
/// </summary>
/// <value>
/// The instance.
/// </value>
public IAdminClient Instance { get; set; } = default;
/// <summary>
/// Gets the instance.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <returns></returns>
public IAdminClient GetInstance(IConfiguration configuration)
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var adminClientConfig = new AdminClientConfig()
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
};
if (enableAuthorization)
{
adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
adminClientConfig.SaslMechanism = SaslMechanism.Plain;
adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"];
adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new AdminClientBuilder(adminClientConfig).Build();
return Instance;
}
/// <summary>
/// Checks the topic asynchronous.
/// </summary>
/// <param name="topic">The topic.</param>
/// <returns></returns>
public async Task<bool> CheckTopicAsync(string topic)
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
}
/// <summary>
/// Creates the topic if not exist asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <param name="factorNum">The factor number.</param>
/// <param name="partitionNum">The partition number.</param>
public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum)
{
try
{
if (await CheckTopicAsync(topicName)) return;
await Instance.CreateTopicsAsync(new TopicSpecification[]
{
new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }
});
}
catch (CreateTopicsException e)
{
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
}
/// <summary>
/// Deletes the topic asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
public async Task DeleteTopicAsync(List<string> topicName)
{
try
{
await Instance.DeleteTopicsAsync(topicName, null);
}
catch (DeleteTopicsException e)
{
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
}
}
}

View File

@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.AdminClient
{
public interface IAdminClientService
{
/// <summary>
/// Checks the topic asynchronous.
/// </summary>
/// <param name="topic">The topic.</param>
/// <returns></returns>
Task<bool> CheckTopicAsync(string topic);
/// <summary>
/// Creates the topic if not exist asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <param name="factorNum">The factor number.</param>
/// <param name="partitionNum">The partition number.</param>
/// <returns></returns>
Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum);
/// <summary>
/// Deletes the topic asynchronous.
/// </summary>
/// <param name="topicName">Name of the topic.</param>
/// <returns></returns>
Task DeleteTopicAsync(List<string> topicName);
}
}

View File

@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Attributes
{
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class TopicAttribute: Attribute
{
/// <summary>
/// Initializes a new instance of the <see cref="TopicAttribute"/> class.
/// </summary>
/// <param name="name">The name.</param>
public TopicAttribute(string name = "Default")
{
Name = name;
}
/// <summary>
/// Gets or sets the name.
/// </summary>
/// <value>
/// The name.
/// </value>
public string Name { get; set; }
}
}

View File

@ -1,4 +1,8 @@
using Volo.Abp.Modularity;
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Kafka
{
@ -6,6 +10,16 @@ namespace JiShe.CollectBus.Kafka
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
// 注册 Kafka 生产者
context.Services.AddSingleton<IProducer<string, string>>(sp =>
new ProducerBuilder<string, string>(
configuration.GetSection("Kafka:ProducerConfig").Get<ProducerConfig>()
)
.Build());
// 注册后台服务
context.Services.AddHostedService<ConsumerBackgroundService>();
}
}
}

View File

@ -0,0 +1,54 @@
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer
{
public class ConsumerBackgroundService : BackgroundService
{
private readonly ConsumerService _consumerService;
private readonly ILogger<ConsumerBackgroundService> _logger;
public ConsumerBackgroundService(
ConsumerService consumerService,
ILogger<ConsumerBackgroundService> logger)
{
_consumerService = consumerService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumerService.Subscribe("abp-kafka-topic");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var result = _consumerService.Consume(stoppingToken);
await ProcessMessageAsync(result.Message.Value);
_consumerService.Commit(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"Message consume error: {ex.Error.Reason}");
}
}
}
private async Task ProcessMessageAsync(string message)
{
// 使用 ABP 的异步处理机制
await Task.Run(() =>
{
_logger.LogInformation($"Processing message: {message}");
// 这里可以触发 ABP 的领域事件
});
}
}
}

View File

@ -0,0 +1,58 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient;
namespace JiShe.CollectBus.Kafka.Consumer
{
public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency
{
private readonly ILogger<ConsumerService> _logger;
private readonly IConsumer<string, string> _consumer;
public ConsumerService(
ILogger<ConsumerService> logger,
IConfiguration configuration)
{
_logger = logger;
var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig")
.Get<ConsumerConfig>();
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetErrorHandler(OnConsumeError)
.Build();
}
public void Subscribe(string topic)
{
_consumer.Subscribe(topic);
_logger.LogInformation($"Subscribed to topic: {topic}");
}
public ConsumeResult<string, string> Consume(CancellationToken cancellationToken)
{
return _consumer.Consume(cancellationToken);
}
public void Commit(ConsumeResult<string, string> result)
{
_consumer.Commit(result);
_logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}");
}
private void OnConsumeError(IConsumer<string, string> consumer, Error error)
{
_logger.LogError($"Kafka consumer error: {error.Reason}");
}
public void Dispose()
{
_consumer?.Close();
_consumer?.Dispose();
}
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer
{
public interface IConsumerService
{
}
}

View File

@ -1,58 +0,0 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public abstract class ConsumerService<T> : BackgroundService
{
private readonly IConsumer<Ignore, T> _consumer;
private readonly ILogger<ConsumerService<T>> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="ConsumerService{T}"/> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <param name="consumer">The consumer.</param>
/// <param name="logger">The logger.</param>
protected ConsumerService(IConfiguration configuration, IConsumer<Ignore, T> consumer, ILogger<ConsumerService<T>> logger)
{
_consumer = consumer;
_logger = logger;
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
GroupId = "InventoryConsumerGroup",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Ignore, T>(consumerConfig).Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("InventoryUpdates");
while (!stoppingToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(stoppingToken);
var message = consumeResult.Message.Value;
await ProcessMessageAsync(consumeResult);
}
_consumer.Close();
await Task.CompletedTask;
}
protected abstract Task ProcessMessageAsync(ConsumeResult<Ignore, T> consumer);
}
}

View File

@ -8,7 +8,7 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp" Version="8.3.3" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,14 @@
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Producer
{
public interface IProducerService
{
Task ProduceAsync(string topic, string message);
}
}

View File

@ -0,0 +1,64 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerBaseService<TKey, TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="ProducerBaseService{TKey, TValue}"/> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
public ProducerBaseService(IConfiguration configuration)
{
GetInstance(configuration);
}
/// <summary>
/// Gets or sets the instance.
/// </summary>
/// <value>
/// The instance.
/// </value>
public IProducer<TKey, TValue> Instance { get; set; } = default;
/// <summary>
/// Gets the instance.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <returns></returns>
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var producerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true,
};
if (enableAuthorization)
{
producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
producerConfig.SaslMechanism = SaslMechanism.Plain;
producerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
producerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(producerConfig).Build();
return Instance;
}
/// <summary>
/// Produces the asynchronous.
/// </summary>
/// <param name="topic">The topic.</param>
/// <param name="message">The message.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
{
await Instance.ProduceAsync(topic, message, cancellationToken);
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerService: IProducerService,ITransientDependency
{
private readonly IProducer<string, string> _producer;
public ProducerService(IProducer<string, string> producer)
{
_producer = producer;
}
public async Task ProduceAsync(string topic, string message)
{
await _producer.ProduceAsync(topic, new Message<string, string>
{
Key = null,
Value = message
});
}
}
}

View File

@ -1,28 +0,0 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka
{
public class ProducerService<T> : ITransientDependency
{
private readonly IProducer<Ignore, T> _producer;
public ProducerService(IConfiguration configuration)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"]
};
_producer = new ProducerBuilder<Ignore, T>(producerConfig).Build();
}
public async Task ProduceAsync(string topic, T message)
{
var msg = new Message<Ignore, T> { Value = message };
await _producer.ProduceAsync(topic, msg);
}
}
}