代码修改
This commit is contained in:
parent
48b91183c2
commit
b064ff3d14
@ -304,8 +304,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
|
||||
timer1.Stop();
|
||||
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
||||
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
|
||||
//return;
|
||||
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
|
||||
return;
|
||||
#else
|
||||
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
||||
#endif
|
||||
|
||||
@ -12,11 +12,11 @@ namespace JiShe.CollectBus.Common.Consts
|
||||
/// <summary>
|
||||
/// 心跳下行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event";
|
||||
public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event2";
|
||||
/// <summary>
|
||||
/// 登录下行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberLoginIssuedEventName = "issued.login.event";
|
||||
public const string SubscriberLoginIssuedEventName = "issued.login.event2";
|
||||
|
||||
/// <summary>
|
||||
/// 上行消息主题,测试使用
|
||||
@ -26,11 +26,11 @@ namespace JiShe.CollectBus.Common.Consts
|
||||
/// <summary>
|
||||
/// 心跳上行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event";
|
||||
public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event2";
|
||||
/// <summary>
|
||||
/// 登录上行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberLoginReceivedEventName = "received.login.event";
|
||||
public const string SubscriberLoginReceivedEventName = "received.login.event2";
|
||||
|
||||
#region 电表消息主题
|
||||
/// <summary>
|
||||
|
||||
@ -91,8 +91,8 @@
|
||||
"SaslUserName": "lixiao",
|
||||
"SaslPassword": "lixiao1980",
|
||||
"KafkaReplicationFactor": 3,
|
||||
"NumPartitions": 30,
|
||||
"ServerTagName": "JiSheCollectBus3"
|
||||
"NumPartitions": 1,
|
||||
"ServerTagName": "JiSheCollectBus2"
|
||||
//"Topic": {
|
||||
// "ReplicationFactor": 3,
|
||||
// "NumPartitions": 1000
|
||||
|
||||
@ -46,7 +46,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
var app = context.GetApplicationBuilder();
|
||||
|
||||
// 注册Subscriber
|
||||
app.ApplicationServices.UseKafkaSubscribers();
|
||||
app.ApplicationServices.UseKafkaSubscribe();
|
||||
|
||||
// 获取程序集
|
||||
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
using Confluent.Kafka;
|
||||
using JiShe.CollectBus.Common.Consts;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
@ -122,7 +123,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
var result = consumer.Consume(cts.Token);
|
||||
if (result == null || result.Message==null || result.Message.Value == null)
|
||||
{
|
||||
#if DEBUG
|
||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
||||
#endif
|
||||
//consumer.Commit(result); // 手动提交
|
||||
continue;
|
||||
}
|
||||
@ -151,7 +154,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
}
|
||||
catch (ConsumeException ex)
|
||||
{
|
||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
||||
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -170,14 +173,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
/// <returns></returns>
|
||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
||||
{
|
||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
||||
var consumerKey = typeof(KafkaConsumer<string, TValue>);
|
||||
var cts = new CancellationTokenSource();
|
||||
|
||||
if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
|
||||
{
|
||||
string ssss = "";
|
||||
}
|
||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
||||
(
|
||||
CreateConsumer<Ignore, TValue>(groupId),
|
||||
CreateConsumer<string, TValue>(groupId),
|
||||
cts
|
||||
)).Consumer as IConsumer<Ignore, TValue>;
|
||||
)).Consumer as IConsumer<string, TValue>;
|
||||
|
||||
consumer!.Subscribe(topics);
|
||||
|
||||
@ -190,8 +196,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
var result = consumer.Consume(cts.Token);
|
||||
if (result == null || result.Message==null || result.Message.Value == null)
|
||||
{
|
||||
#if DEBUG
|
||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
||||
#endif
|
||||
//consumer.Commit(result); // 手动提交
|
||||
consumer.StoreOffset(result);
|
||||
continue;
|
||||
}
|
||||
if (result.IsPartitionEOF)
|
||||
@ -214,10 +223,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
bool sucess = await messageHandler(result.Message.Value);
|
||||
if (sucess)
|
||||
consumer.Commit(result); // 手动提交
|
||||
else
|
||||
consumer.StoreOffset(result);
|
||||
}
|
||||
catch (ConsumeException ex)
|
||||
{
|
||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
||||
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -339,7 +350,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
}
|
||||
catch (ConsumeException ex)
|
||||
{
|
||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
||||
_logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@ -385,14 +396,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
/// <param name="consumeTimeout">消费等待时间</param>
|
||||
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
||||
{
|
||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
||||
var consumerKey = typeof(KafkaConsumer<string, TValue>);
|
||||
var cts = new CancellationTokenSource();
|
||||
|
||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||
(
|
||||
CreateConsumer<Ignore, TValue>(groupId),
|
||||
CreateConsumer<string, TValue>(groupId),
|
||||
cts
|
||||
)).Consumer as IConsumer<Ignore, TValue>;
|
||||
)).Consumer as IConsumer<string, TValue>;
|
||||
|
||||
consumer!.Subscribe(topics);
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Newtonsoft.Json;
|
||||
using System.Reflection;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka
|
||||
@ -21,7 +22,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
/// </summary>
|
||||
/// <param name="app"></param>
|
||||
/// <param name="assembly"></param>
|
||||
public static void UseKafkaSubscribers(this IServiceProvider provider)
|
||||
public static void UseKafkaSubscribe(this IServiceProvider provider)
|
||||
{
|
||||
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
||||
|
||||
@ -69,7 +70,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
if (subscribe!=null)
|
||||
{
|
||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
|
||||
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||
threadCount += tuple.Item1;
|
||||
topicCount += tuple.Item2;
|
||||
}
|
||||
@ -77,6 +78,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
}
|
||||
}
|
||||
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@ -113,7 +115,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
|
||||
if (subscribe != null)
|
||||
{
|
||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
|
||||
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||
threadCount += tuple.Item1;
|
||||
topicCount += tuple.Item2;
|
||||
}
|
||||
@ -128,7 +130,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
/// </summary>
|
||||
/// <param name="subscribe"></param>
|
||||
/// <param name="provider"></param>
|
||||
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
|
||||
private static Tuple<int,int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
|
||||
{
|
||||
var subscribedMethods = subscribe.GetType().GetMethods()
|
||||
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
||||
@ -166,7 +168,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
|
||||
if (attr.EnableBatch)
|
||||
{
|
||||
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
|
||||
await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -183,7 +185,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
}
|
||||
else
|
||||
{
|
||||
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
|
||||
await consumerService.SubscribeAsync<object>(attr.Topic, async (message) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -209,7 +211,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
/// <param name="method"></param>
|
||||
/// <param name="subscribe"></param>
|
||||
/// <returns></returns>
|
||||
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
|
||||
private static async Task<bool> ProcessMessageAsync(object message, MethodInfo method, object subscribe)
|
||||
{
|
||||
var parameters = method.GetParameters();
|
||||
bool isGenericTask = method.ReturnType.IsGenericType
|
||||
@ -219,26 +221,66 @@ namespace JiShe.CollectBus.Kafka
|
||||
//if (existParameters)
|
||||
//{
|
||||
// var paramType = parameters[0].ParameterType;
|
||||
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
|
||||
// if (paramType.IsInstanceOfType(message))
|
||||
// return message; // 类型兼容则直接使用
|
||||
|
||||
// var json = message.ToString();
|
||||
// messageObj= message.de JsonConvert.DeserializeObject(json, targetType);
|
||||
//}
|
||||
if (isGenericTask)
|
||||
|
||||
object[] args = null;
|
||||
if (existParameters)
|
||||
{
|
||||
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
|
||||
if (result is ISubscribeAck ackResult)
|
||||
var paramType = parameters[0].ParameterType;
|
||||
// 类型转换逻辑
|
||||
object convertedMessage = paramType.IsInstanceOfType(message)
|
||||
? message
|
||||
: ConvertMessage(message, paramType);
|
||||
|
||||
args = new object[] { convertedMessage };
|
||||
}
|
||||
var result = method.Invoke(subscribe, args);
|
||||
if (result is Task<ISubscribeAck> genericTask)
|
||||
{
|
||||
await genericTask.ConfigureAwait(false);
|
||||
return genericTask.Result.Ack;
|
||||
}
|
||||
else if (result is Task nonGenericTask)
|
||||
{
|
||||
await nonGenericTask.ConfigureAwait(false);
|
||||
return true;
|
||||
}
|
||||
else if (result is ISubscribeAck ackResult)
|
||||
{
|
||||
return ackResult.Ack;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
|
||||
if (result is ISubscribeAck ackResult)
|
||||
{
|
||||
return ackResult.Ack;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
//if (isGenericTask)
|
||||
//{
|
||||
// object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
|
||||
// if (result is ISubscribeAck ackResult)
|
||||
// {
|
||||
// return ackResult.Ack;
|
||||
// }
|
||||
//}
|
||||
//else
|
||||
//{
|
||||
// object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
|
||||
// if (result is ISubscribeAck ackResult)
|
||||
// {
|
||||
// return ackResult.Ack;
|
||||
// }
|
||||
//}
|
||||
return false;
|
||||
}
|
||||
// 可扩展的类型转换器
|
||||
private static object ConvertMessage(object rawMessage, Type targetType)
|
||||
{
|
||||
if (targetType.IsInstanceOfType(rawMessage))
|
||||
return rawMessage; // 类型兼容则直接使用
|
||||
|
||||
var json = rawMessage.ToString();
|
||||
return JsonConvert.DeserializeObject(json, targetType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,6 +129,7 @@ namespace JiShe.CollectBus.Kafka.Producer
|
||||
var producer = GetProducer<string, TValue>(typeKey);
|
||||
var message = new Message<string, TValue>
|
||||
{
|
||||
Key= _kafkaOptionConfig.ServerTagName,
|
||||
Value = value,
|
||||
Headers = new Headers{
|
||||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||||
@ -187,6 +188,7 @@ namespace JiShe.CollectBus.Kafka.Producer
|
||||
{
|
||||
var message = new Message<string, TValue>
|
||||
{
|
||||
Key = _kafkaOptionConfig.ServerTagName,
|
||||
Value = value,
|
||||
Headers = new Headers{
|
||||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user