合并代码

This commit is contained in:
ChenYi 2025-04-21 09:48:12 +08:00
commit cb6cfea4dd
26 changed files with 520 additions and 225 deletions

View File

@ -47,6 +47,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2.Services", "2.Services",
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EBF7C01F-9B4F-48E6-8418-2CBFDA51EB0B}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -121,6 +123,10 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -143,6 +149,7 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -3,6 +3,7 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
@ -16,53 +17,54 @@ namespace JiShe.CollectBus.Kafka.Test
{ {
public class KafkaSubscribeTest: IKafkaSubscribe public class KafkaSubscribeTest: IKafkaSubscribe
{ {
[KafkaSubscribe(ProtocolConst.TESTTOPIC, EnableBatch=false,BatchSize=1000)] [KafkaSubscribe(ProtocolConst.TESTTOPIC, EnableBatch = false, BatchSize = 10)]
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj) public async Task<ISubscribeAck> KafkaSubscribeAsync(TestTopic obj)
//public async Task<ISubscribeAck> KafkaSubscribeAsync(IEnumerable<int> obj)
{ {
Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(obj)}"); Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(obj)}");
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] //[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] ////[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage) //public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
{ //{
Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} //}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] //[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] ////[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) //public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
{ //{
Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} //}
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] //[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] ////[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage) //public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
{ //{
Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedMessage)}"); // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedMessage)}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} //}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] //[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] ////[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) //public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
{ //{
Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedHeartbeatMessage)}"); // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedHeartbeatMessage)}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} //}
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] //[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] ////[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) //public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
{ //{
Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedLoginMessage)}"); // Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedLoginMessage)}");
return SubscribeAck.Success(); // return SubscribeAck.Success();
} //}
} }
} }

View File

@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Kafka.Test; using JiShe.CollectBus.Kafka.Test;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
@ -86,12 +87,13 @@ var logger = loggerFactory.CreateLogger<Program>();
logger.LogInformation("程序启动"); logger.LogInformation("程序启动");
var adminClientService = host.Services.GetRequiredService<IAdminClientService>(); var adminClientService = host.Services.GetRequiredService<IAdminClientService>();
var configuration = host.Services.GetRequiredService<IConfiguration>(); var configuration = host.Services.GetRequiredService<IConfiguration>();
string topic = "test-topic"; string topic = ProtocolConst.TESTTOPIC;
//await adminClientService.DeleteTopicAsync(topic); //await adminClientService.DeleteTopicAsync(topic);
// 创建 topic // 创建 topic
//await adminClientService.CreateTopicAsync(topic, configuration.GetValue<int>(CommonConst.NumPartitions), 3); //await adminClientService.CreateTopicAsync(topic, configuration.GetValue<int>(CommonConst.NumPartitions), 3);
var consumerService = host.Services.GetRequiredService<IConsumerService>(); var consumerService = host.Services.GetRequiredService<IConsumerService>();
var producerService = host.Services.GetRequiredService<IProducerService>();
//var kafkaOptions = host.Services.GetRequiredService<IOptions<KafkaOptionConfig>>(); //var kafkaOptions = host.Services.GetRequiredService<IOptions<KafkaOptionConfig>>();
//await consumerService.SubscribeAsync<object>(topic, (message) => //await consumerService.SubscribeAsync<object>(topic, (message) =>
//{ //{
@ -113,43 +115,49 @@ var consumerService = host.Services.GetRequiredService<IConsumerService>();
//for (int i = 0; i < 3; i++) //for (int i = 0; i < 3; i++)
//{ //{
// await consumerService.SubscribeBatchAsync<dynamic>(topic, (message) => //await consumerService.SubscribeBatchAsync<dynamic>(topic, (message) =>
//{
// try
// { // {
// try // int index = 0;
// { // logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}{JsonSerializer.Serialize(message)}");
// int index = 0; // return Task.FromResult(true);
// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}{JsonSerializer.Serialize(message)}");
// return Task.FromResult(true);
// } // }
// catch (ConsumeException ex) // catch (ConsumeException ex)
// { // {
// // 处理消费错误 // // 处理消费错误
// logger.LogError($"kafka消费异常:{ex.Message}"); // logger.LogError($"kafka消费异常:{ex.Message}");
// } // }
// return Task.FromResult(false); // return Task.FromResult(false);
// }); //});
//} //}
//stopwatch.Stop(); //stopwatch.Stop();
//Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒"); //Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒");
var producerService = host.Services.GetRequiredService<IProducerService>();
//int num = 840; int num = 1;
//while (num <= 900) while (num <= 6)
{
await producerService.ProduceAsync<TestTopic>(topic, new TestTopic { Topic = topic, Val = num });
num++;
}
//int num = 2;
//while (num <= 4)
//{ //{
// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i });
// await producerService.ProduceAsync<string>(topic, num.ToString()); // await producerService.ProduceAsync<string>(topic, num.ToString());
// num++; // num++;
//} //}
await Task.Factory.StartNew(async() => { //await Task.Factory.StartNew(async() => {
int num = 0; // int num = 0;
while (true) // while (true)
{ // {
//await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); // //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i });
await producerService.ProduceAsync<string>(topic, num.ToString()); // await producerService.ProduceAsync<string>(topic, num.ToString());
num++; // num++;
} // }
}); //});
Console.WriteLine("\n按Esc键退出"); Console.WriteLine("\n按Esc键退出");
while (true) while (true)
{ {

View File

@ -91,8 +91,8 @@
"SaslUserName": "lixiao", "SaslUserName": "lixiao",
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 1, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus2" "ServerTagName": "JiSheCollectBus99"
//"Topic": { //"Topic": {
// "ReplicationFactor": 3, // "ReplicationFactor": 3,
// "NumPartitions": 1000 // "NumPartitions": 1000

View File

@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary> /// <summary>
/// 消费者组 /// 消费者组
/// </summary> /// </summary>
public string GroupId { get; set; } = "default"; public string? GroupId { get; set; } = null;//"default"
/// <summary> /// <summary>
/// 任务数(默认是多少个分区多少个任务) /// 任务数(默认是多少个分区多少个任务)

View File

@ -1,6 +1,7 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
@ -43,10 +44,19 @@ namespace JiShe.CollectBus.Kafka
//context.Services.AddHostedService<HostedService>(); //context.Services.AddHostedService<HostedService>();
} }
/// <summary>
/// 在初始化之前,初始化Kafka Topic
/// </summary>
/// <param name="context"></param>
public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
app.ApplicationServices.UseInitKafkaTopic();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
{ {
var app = context.GetApplicationBuilder(); var app = context.GetApplicationBuilder();
// 注册Subscriber // 注册Subscriber
app.ApplicationServices.UseKafkaSubscribe(); app.ApplicationServices.UseKafkaSubscribe();

View File

@ -1,5 +1,7 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -47,11 +49,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
var config = new ConsumerConfig var config = new ConsumerConfig
{ {
BootstrapServers = _kafkaOptionConfig.BootstrapServers, BootstrapServers = _kafkaOptionConfig.BootstrapServers,
GroupId = groupId ?? "default", GroupId = groupId ?? _kafkaOptionConfig.ServerTagName,
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记 EnablePartitionEof = true, // 启用分区末尾标记
AllowAutoCreateTopics = true, // 启用自动创建 //AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
}; };
@ -252,7 +254,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="groupId">消费组ID</param> /// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param> /// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param> /// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{ {
await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
} }
@ -267,17 +269,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="groupId">消费组ID</param> /// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param> /// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param> /// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{ {
var consumerKey = typeof(KafkaConsumer<TKey, TValue>); var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => //var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
( // (
CreateConsumer<TKey, TValue>(groupId), // CreateConsumer<TKey, TValue>(groupId),
cts // cts
)).Consumer as IConsumer<TKey, TValue>; // )).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<string, TValue>(groupId);
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
@ -300,8 +302,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
if (result.IsPartitionEOF) if (result.IsPartitionEOF)
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); await Task.Delay(10, cts.Token);
} }
else if (result.Message.Value != null) else if (result.Message.Value != null)
{ {
@ -330,7 +332,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
// 处理批次 // 处理批次
if (messages.Count > 0) if (messages.Count > 0)
{ {
bool success = await messageBatchHandler(messages.Select(m => m.Value)); bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success) if (success)
{ {
var offsetsByPartition = new Dictionary<TopicPartition, long>(); var offsetsByPartition = new Dictionary<TopicPartition, long>();
@ -383,7 +385,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="batchSize">批次大小</param> /// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param> /// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param> /// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class public async Task SubscribeBatchAsync<TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{ {
await SubscribeBatchAsync<TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); await SubscribeBatchAsync<TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
@ -400,17 +402,18 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="batchSize">批次大小</param> /// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param> /// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param> /// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
{ {
var consumerKey = typeof(KafkaConsumer<string, TValue>); var consumerKey = typeof(KafkaConsumer<string, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => //var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
( // (
CreateConsumer<string, TValue>(groupId), // CreateConsumer<string, TValue>(groupId),
cts // cts
)).Consumer as IConsumer<string, TValue>; // )).Consumer as IConsumer<string, TValue>;
var consumer= CreateConsumer<string, TValue> (groupId);
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
@ -434,8 +437,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
{ {
if (result.IsPartitionEOF) if (result.IsPartitionEOF)
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); await Task.Delay(10, cts.Token);
} }
else if (result.Message.Value != null) else if (result.Message.Value != null)
{ {
@ -464,7 +467,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
// 处理批次 // 处理批次
if (messages.Count > 0) if (messages.Count > 0)
{ {
bool success = await messageBatchHandler(messages.Select(m => m.Value)); bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success) if (success)
{ {
var offsetsByPartition = new Dictionary<TopicPartition, long>(); var offsetsByPartition = new Dictionary<TopicPartition, long>();

View File

@ -33,13 +33,13 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns> /// <returns></returns>
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class; Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
Task SubscribeBatchAsync<TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; Task SubscribeBatchAsync<TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
Task SubscribeBatchAsync<TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; Task SubscribeBatchAsync<TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class; void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
} }

View File

@ -5,7 +5,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka.Internal
{ {
/// <summary> /// <summary>
/// 消息头过滤器 /// 消息头过滤器

View File

@ -4,7 +4,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka.Internal
{ {
/// <summary> /// <summary>
/// Kafka订阅者 /// Kafka订阅者

View File

@ -4,7 +4,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka.Internal
{ {
public interface ISubscribeAck public interface ISubscribeAck
{ {

View File

@ -5,7 +5,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka.Internal
{ {
public class KafkaOptionConfig public class KafkaOptionConfig
{ {

View File

@ -0,0 +1,113 @@
using Newtonsoft.Json;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Internal
{
/// <summary>
/// 反射辅助类
/// </summary>
public static class ReflectionHelper
{
/// <summary>
///集合类型
///Item1参数类型
///Item2集合元素类型
/// </summary>
public static Tuple<Type,Type?> GetParameterTypeInfo(this MethodInfo method, int parameterIndex=0)
{
// 参数校验
if (method == null) throw new ArgumentNullException(nameof(method));
var parameters = method.GetParameters();
if (parameterIndex < 0 || parameterIndex >= parameters.Length)
throw new ArgumentOutOfRangeException(nameof(parameterIndex));
ParameterInfo param = parameters[parameterIndex];
Type paramType = param.ParameterType;
Type? elementType = null;
// 判断是否是集合类型(排除字符串)
if (paramType != typeof(string) && IsEnumerableType(paramType))
{
elementType = GetEnumerableElementType(paramType);
}
return Tuple.Create(paramType, elementType);
}
/// <summary>
/// 判断是否是集合类型(排除字符串)
/// </summary>
public static bool IsEnumerableType(this Type type)
{
return type.IsArray
|| (type.IsGenericType && type.GetInterfaces()
.Any(t => t.IsGenericType
&& t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))
|| type.GetInterfaces().Any(t => t == typeof(System.Collections.IEnumerable));
}
/// <summary>
/// 获取集合元素的类型
/// </summary>
public static Type? GetEnumerableElementType(this Type type)
{
// 处理数组类型
if (type.IsArray)
return type.GetElementType();
// 处理直接实现IEnumerable<T>的类型如IEnumerable<int>本身)
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>))
return type.GetGenericArguments()[0];
// 处理通过接口实现IEnumerable<T>的泛型集合如List<T>
var genericEnumerable = type.GetInterfaces()
.FirstOrDefault(t => t.IsGenericType
&& t.GetGenericTypeDefinition() == typeof(IEnumerable<>));
if (genericEnumerable != null)
return genericEnumerable.GetGenericArguments()[0];
// 处理非泛型集合类型(如 ArrayList
if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList))
return typeof(ArrayList);
// 返回null表示无法确定元素类型
return null;
}
// <summary>
/// 判断是否使用强转换
/// </summary>
/// <param name="targetType">目标类型</param>
/// <returns></returns>
public static bool IsConvertType(this Type targetType)
{
// 处理可空类型
Type underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType;
// 情况1值类型或基元类型如 int、DateTime
if (underlyingType.IsValueType || underlyingType.IsPrimitive)
return true;
// 情况2字符串类型直接赋值
else if (underlyingType == typeof(string))
return true;
// 情况3枚举类型处理
//else if (underlyingType.IsEnum)
//{
// if (Enum.IsDefined(underlyingType, msg))
// {
// convertedValue = Enum.Parse(underlyingType, msg.ToString());
// return true;
// }
// return false;
//}
return false;
}
}
}

View File

@ -6,7 +6,7 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using static System.Runtime.InteropServices.JavaScript.JSType; using static System.Runtime.InteropServices.JavaScript.JSType;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka.Internal
{ {
public class SubscribeResult: ISubscribeAck public class SubscribeResult: ISubscribeAck
{ {

View File

@ -5,30 +5,33 @@ using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Mvc.Abstractions;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Newtonsoft.Json; using System;
using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.ComponentModel;
using System.Linq.Expressions;
using System.Reflection; using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks; using System.Threading.Tasks;
using YamlDotNet.Core.Tokens;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka
{ {
public static class KafkaSubcribesExtensions public static class KafkaSubcribesExtensions
{ {
/// <summary>
/// 添加Kafka订阅
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static void UseKafkaSubscribe(this IServiceProvider provider)
{
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
public static void UseInitKafkaTopic(this IServiceProvider provider)
{
//初始化主题信息 //初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>(); var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>(); var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
@ -40,6 +43,17 @@ namespace JiShe.CollectBus.Kafka
{ {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} }
}
/// <summary>
/// 添加Kafka订阅
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static void UseKafkaSubscribe(this IServiceProvider provider)
{
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
lifetime.ApplicationStarted.Register(() => lifetime.ApplicationStarted.Register(() =>
{ {
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
@ -88,17 +102,7 @@ namespace JiShe.CollectBus.Kafka
{ {
var provider = app.ApplicationServices; var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>(); var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
lifetime.ApplicationStarted.Register(() => lifetime.ApplicationStarted.Register(() =>
{ {
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
@ -132,7 +136,7 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <param name="provider"></param> /// <param name="provider"></param>
private static Tuple<int,int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig) private static Tuple<int, int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
{ {
var subscribedMethods = subscribe.GetType().GetMethods() var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() }) .Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
@ -147,7 +151,7 @@ namespace JiShe.CollectBus.Kafka
#if DEBUG #if DEBUG
var adminClientService = provider.GetRequiredService<IAdminClientService>(); var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount= partitionCount> topicCount ? topicCount: partitionCount; partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
#endif #endif
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0) if (partitionCount <= 0)
@ -170,18 +174,18 @@ namespace JiShe.CollectBus.Kafka
/// <param name="method"></param> /// <param name="method"></param>
/// <param name="consumerInstance"></param> /// <param name="consumerInstance"></param>
/// <returns></returns> /// <returns></returns>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger) private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
{ {
var consumerService = provider.GetRequiredService<IConsumerService>(); var consumerService = provider.GetRequiredService<IConsumerService>();
if (attr.EnableBatch) if (attr.EnableBatch)
{ {
await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) => await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
{ {
try try
{ {
#if DEBUG #if DEBUG
logger.LogInformation($"kafka批量消费消息:{message}"); logger.LogInformation($"kafka批量消费消息:{message.Serialize()}");
#endif #endif
// 处理消息 // 处理消息
return await ProcessMessageAsync(message.ToList(), method, subscribe); return await ProcessMessageAsync(message.ToList(), method, subscribe);
@ -196,7 +200,7 @@ namespace JiShe.CollectBus.Kafka
} }
else else
{ {
await consumerService.SubscribeAsync<object>(attr.Topic, async (message) => await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
{ {
try try
{ {
@ -225,26 +229,112 @@ namespace JiShe.CollectBus.Kafka
/// <param name="method"></param> /// <param name="method"></param>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <returns></returns> /// <returns></returns>
private static async Task<bool> ProcessMessageAsync(List<object> messages, MethodInfo method, object subscribe) private static async Task<bool> ProcessMessageAsync(List<dynamic> messages, MethodInfo method, object subscribe)
{ {
var parameters = method.GetParameters(); var parameters = method.GetParameters();
bool isGenericTask = method.ReturnType.IsGenericType bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0; bool existParameters = parameters.Length > 0;
List<object>? messageObj = null; object[]? executeParameters = null;
if (existParameters) if (existParameters)
{ {
messageObj = new List<object>(); IList? list = null;
var paramType = parameters[0].ParameterType; Tuple<Type, Type?> tuple = method.GetParameterTypeInfo();
foreach (var msg in messages) bool isEnumerable = false;
if (tuple.Item2 != null)
{ {
var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg; Type listType = typeof(List<>).MakeGenericType(tuple.Item2);
if (data != null) list = (IList)Activator.CreateInstance(listType)!;
messageObj.Add(data); isEnumerable = tuple.Item2.IsConvertType();
}
else
{
isEnumerable = tuple.Item1.IsConvertType();
}
#region
//foreach (var msg in messages)
//{
// if (tuple.Item2 != null)
// {
// if (isEnumerable)
// {
// var parameterType = parameters[0].ParameterType;
// var data=messages?.Serialize().Deserialize(parameterType);
// messageObj = data!=null? new[] { data }:null;
// break;
// }
// else
// {
// // 集合类型
// var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/;
// if (data != null)
// list?.Add(data);
// }
// }
// else
// {
// // (dynamic)Convert.ChangeType(msg, tuple.Item1)
// using (var stream = new MemoryStream(msg))
// {
// var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1);
// }
// var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1);
// if (data != null)
// messageObj = new[] { data };
// }
//}
//if (tuple.Item2 != null && list != null && list.Count > 0)
//{
// messageObj = new[] { list };
//}
#endregion
var parameterDescriptors = method.GetParameters();
executeParameters = new object?[parameterDescriptors.Length];
for (var i = 0; i < parameterDescriptors.Length; i++)
{
foreach (var item in messages)
{
object? tempParameter=null;
var parameterDescriptor = parameterDescriptors[i];
if (KafkaSerialization.IsJsonType(item))
{
tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType);
}
else
{
var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(item.GetType()))
{
tempParameter = converter.ConvertFrom(item);
}
else
{
if (parameterDescriptor.ParameterType.IsInstanceOfType(item))
tempParameter = item;
else
tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType);
}
}
if (tuple.Item2 == null)
{
executeParameters[i] = tempParameter;
}
else
{
list.Add(tempParameter);
}
}
if(list!=null && list.Count>0)
executeParameters[i] = list;
} }
} }
var result = method.Invoke(subscribe, messageObj?.ToArray()); var result = method.Invoke(subscribe, executeParameters);
if (result is Task<ISubscribeAck> genericTask) if (result is Task<ISubscribeAck> genericTask)
{ {
await genericTask.ConfigureAwait(false); await genericTask.ConfigureAwait(false);
@ -262,5 +352,9 @@ namespace JiShe.CollectBus.Kafka
return false; return false;
} }
} }
} }

View File

@ -6,6 +6,8 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -56,7 +58,7 @@ namespace JiShe.CollectBus.Kafka.Producer
var config = new ProducerConfig var config = new ProducerConfig
{ {
BootstrapServers = _kafkaOptionConfig.BootstrapServers, BootstrapServers = _kafkaOptionConfig.BootstrapServers,
AllowAutoCreateTopics = true, //AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K BatchSize = 32_768, // 修改批次大小为32K
@ -108,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Producer
var message = new Message<TKey, TValue> var message = new Message<TKey, TValue>
{ {
Key = key, Key = key,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
} }

View File

@ -8,7 +8,7 @@ using Confluent.Kafka;
using System.Text.Json.Serialization; using System.Text.Json.Serialization;
using System.Text.Encodings.Web; using System.Text.Encodings.Web;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka.Serialization
{ {
/// <summary> /// <summary>
/// JSON 序列化器(支持泛型) /// JSON 序列化器(支持泛型)
@ -49,10 +49,11 @@ namespace JiShe.CollectBus.Kafka
{ {
if (isNull) if (isNull)
return default; return default;
try try
{ {
return JsonSerializer.Deserialize<T>(data, _options); if (data.IsEmpty)
return default;
return JsonSerializer.Deserialize<T>(data, _options)!;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -85,4 +86,40 @@ namespace JiShe.CollectBus.Kafka
writer.WriteStringValue(value.ToString(_dateFormatString)); writer.WriteStringValue(value.ToString(_dateFormatString));
} }
} }
public static class KafkaSerialization
{
/// <summary>
/// 判断是否是json类型
/// </summary>
/// <param name="jsonObject"></param>
/// <returns></returns>
public static bool IsJsonType(this object jsonObject)
{
return jsonObject is JsonElement;
}
public static object? Deserialize(object value, Type valueType)
{
var _jsonSerializerOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,// 设置格式化输出
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true,
NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
AllowTrailingCommas = true, // 忽略尾随逗号
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
};
if (value is JsonElement jsonElement) return jsonElement.Deserialize(valueType, _jsonSerializerOptions);
throw new NotSupportedException("Type is not of type JsonElement");
}
}
} }

View File

@ -1,17 +1,18 @@
using System.Threading.Tasks; using System.Collections.Generic;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Internal;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers namespace JiShe.CollectBus.Subscribers
{ {
public interface ISubscriberAppService : IApplicationService public interface ISubscriberAppService : IApplicationService
{ {
Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage); Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage); Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage); Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage);
Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage); Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessage);
Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage); Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessage);
} }
} }

View File

@ -1,7 +1,7 @@
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Internal;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;

View File

@ -23,6 +23,7 @@ using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.EventBus; using Volo.Abp.EventBus;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using JiShe.CollectBus.Kafka.Internal;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;

View File

@ -20,13 +20,13 @@ using System.Diagnostics.Metrics;
using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using System.Text.Json; using System.Text.Json;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using System.Diagnostics; using System.Diagnostics;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;

View File

@ -13,7 +13,7 @@ using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.RedisDataCache;
@ -27,6 +27,7 @@ using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {

View File

@ -13,7 +13,7 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;

View File

@ -5,7 +5,6 @@ using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
@ -19,6 +18,8 @@ using System.Threading.Tasks;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using TouchSocket.Sockets; using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
using System.Collections.Generic;
using JiShe.CollectBus.Kafka.Internal;
namespace JiShe.CollectBus.Subscribers namespace JiShe.CollectBus.Subscribers
{ {
@ -65,67 +66,75 @@ namespace JiShe.CollectBus.Subscribers
_dbProvider = dbProvider; _dbProvider = dbProvider;
} }
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage) public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false; bool isAck = false;
switch (issuedEventMessage.Type) foreach (var issuedEventMessage in issuedEventMessages)
{ {
case IssuedEventType.Heartbeat: switch (issuedEventMessage.Type)
break; {
case IssuedEventType.Login: case IssuedEventType.Heartbeat:
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); break;
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); case IssuedEventType.Login:
loginEntity.AckTime = Clock.Now; _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
loginEntity.IsAck = true; var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); loginEntity.AckTime = Clock.Now;
isAck = true; loginEntity.IsAck = true;
break; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
case IssuedEventType.Data: isAck = true;
break; break;
default: case IssuedEventType.Data:
throw new ArgumentOutOfRangeException(); break;
default:
throw new ArgumentOutOfRangeException();
}
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null)
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
} }
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null)
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
} }
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false; bool isAck = false;
switch (issuedEventMessage.Type) foreach (var issuedEventMessage in issuedEventMessages)
{ {
case IssuedEventType.Heartbeat: switch (issuedEventMessage.Type)
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); {
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); case IssuedEventType.Heartbeat:
heartbeatEntity.AckTime = Clock.Now; _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
heartbeatEntity.IsAck = true; var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); heartbeatEntity.AckTime = Clock.Now;
isAck = true; heartbeatEntity.IsAck = true;
break; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
case IssuedEventType.Data: isAck = true;
break; break;
default: case IssuedEventType.Data:
throw new ArgumentOutOfRangeException(); break;
default:
throw new ArgumentOutOfRangeException();
}
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null)
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
} }
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null)
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
} }
@ -186,37 +195,44 @@ namespace JiShe.CollectBus.Subscribers
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
{ {
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
if (protocolPlugin == null)
{ {
_logger.LogError("协议不存在!"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
} if (protocolPlugin == null)
else {
{ _logger.LogError("协议不存在!");
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); }
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); else
{
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
}
} }
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)]
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
{ {
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); foreach (var receivedLoginMessage in receivedLoginMessages)
if (protocolPlugin == null)
{ {
_logger.LogError("协议不存在!"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
} if (protocolPlugin == null)
else {
{ _logger.LogError("协议不存在!");
await protocolPlugin.LoginAsync(receivedLoginMessage); }
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); else
{
await protocolPlugin.LoginAsync(receivedLoginMessage);
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
}
} }
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
} }

View File

@ -8,8 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;

View File

@ -43,7 +43,7 @@
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"MaxPoolSize": "50", "MaxPoolSize": "50",
"DefaultDB": "14", "DefaultDB": "14",
"HangfireDB": "15" "HangfireDB": "13"
}, },
"Jwt": { "Jwt": {
"Audience": "JiShe.CollectBus", "Audience": "JiShe.CollectBus",
@ -84,8 +84,7 @@
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus3", "ServerTagName": "JiSheCollectBus2"
"FirstCollectionTime": "2025-04-18 00:00:00"
}, },
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
@ -96,6 +95,7 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus3",
"Cassandra": { "Cassandra": {
"ReplicationStrategy": { "ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy "Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy