Compare commits

...

2 Commits

Author SHA1 Message Date
0328aa08d9 提交代码 2025-05-07 17:10:02 +08:00
cdcf078e5a 优化消费流程 2025-05-07 15:17:54 +08:00
8 changed files with 70 additions and 63 deletions

View File

@ -1,7 +1,9 @@
using Confluent.Kafka; using Confluent.Kafka;
using Confluent.Kafka.Admin; using Confluent.Kafka.Admin;
using JiShe.CollectBus.Kafka.Internal;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.AdminClient; namespace JiShe.CollectBus.Kafka.AdminClient;
@ -9,16 +11,17 @@ namespace JiShe.CollectBus.Kafka.AdminClient;
public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency
{ {
private readonly ILogger<AdminClientService> _logger; private readonly ILogger<AdminClientService> _logger;
private readonly KafkaOptionConfig _kafkaOptionConfig;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="AdminClientService" /> class. /// Initializes a new instance of the <see cref="AdminClientService" /> class.
/// </summary> /// </summary>
/// <param name="configuration"></param> /// <param name="configuration"></param>
/// <param name="logger"></param> /// <param name="logger"></param>
public AdminClientService(IConfiguration configuration, ILogger<AdminClientService> logger) public AdminClientService(IConfiguration configuration, ILogger<AdminClientService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
{ {
_logger = logger; _logger = logger;
Instance = GetInstance(configuration); _kafkaOptionConfig = kafkaOptionConfig.Value;
Instance = GetInstance();
} }
/// <summary> /// <summary>
@ -142,22 +145,19 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
/// <summary> /// <summary>
/// Gets the instance. /// Gets the instance.
/// </summary> /// </summary>
/// <param name="configuration">The configuration.</param>
/// <returns></returns> /// <returns></returns>
public IAdminClient GetInstance(IConfiguration configuration) public IAdminClient GetInstance()
{ {
ArgumentException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
var adminClientConfig = new AdminClientConfig var adminClientConfig = new AdminClientConfig
{ {
BootstrapServers = configuration["Kafka:BootstrapServers"] BootstrapServers = _kafkaOptionConfig.BootstrapServers
}; };
if (enableAuthorization) if (_kafkaOptionConfig.EnableAuthorization)
{ {
adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
adminClientConfig.SaslMechanism = SaslMechanism.Plain; adminClientConfig.SaslMechanism = SaslMechanism.Plain;
adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; adminClientConfig.SaslUsername = _kafkaOptionConfig.SaslUserName;
adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; adminClientConfig.SaslPassword = _kafkaOptionConfig.SaslUserName;
} }
return new AdminClientBuilder(adminClientConfig).Build(); return new AdminClientBuilder(adminClientConfig).Build();
} }

View File

@ -34,7 +34,8 @@ namespace JiShe.CollectBus.Kafka.Internal
ErrorCode.NotCoordinatorForGroup, ErrorCode.NotCoordinatorForGroup,
ErrorCode.NetworkException, ErrorCode.NetworkException,
ErrorCode.GroupCoordinatorNotAvailable, ErrorCode.GroupCoordinatorNotAvailable,
ErrorCode.InvalidGroupId ErrorCode.InvalidGroupId,
ErrorCode.IllegalGeneration
}; };
return ex switch return ex switch
{ {

View File

@ -95,7 +95,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
DeviceId = $"{data.DeviceId}", DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}", DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}",
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default) SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default)
}; };
_runtimeContext.UseTableSessionPool = true; // 使用表模型池 _runtimeContext.UseTableSessionPool = true; // 使用表模型池
@ -198,7 +198,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
DeviceId = $"{item.DeviceId}", DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}", DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}",
ProjectId = $"{item.ProjectId}", ProjectId = $"{item.ProjectId}",
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeMilliseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整 Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue ?? default) SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue ?? default)
}; };
_runtimeContext.UseTableSessionPool = true; // 使用表模型池 _runtimeContext.UseTableSessionPool = true; // 使用表模型池
@ -275,7 +275,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!data.TimeSpan.HasValue) if (!data.TimeSpan.HasValue)
data.TimeSpan = analysisBaseDto.ReceivedTime; data.TimeSpan = analysisBaseDto.ReceivedTime;
// 类型(心跳,登录,上电,掉电) // 类型(心跳,登录,上电,掉电)
long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeMilliseconds(); long timestamps = DateTime.Now.GetDateTimeOffset().ToUnixTimeNanoseconds();
var treeData = new TreeModelSingleMeasuringEntity<T>() var treeData = new TreeModelSingleMeasuringEntity<T>()
{ {
SystemName = _applicationOptions.SystemType, SystemName = _applicationOptions.SystemType,
@ -287,6 +287,20 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 _runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeData); await _dbProvider.InsertAsync(treeData);
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}",
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
SingleMeasuring = new Tuple<string, string>(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty)
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeFrameData);
// 时间 // 时间
var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<long>() var treeRecordingTimeData = new TreeModelSingleMeasuringEntity<long>()
{ {
@ -295,7 +309,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}",
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = new Tuple<string, long>(ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan!.Value.GetDateTimeOffset().ToUnixTimeNanoseconds()) SingleMeasuring = new Tuple<string, long>(ConcentratorStatusFieldConst.RecordingTime, (data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now).GetDateTimeOffset().ToUnixTimeNanoseconds())
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 _runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeRecordingTimeData); await _dbProvider.InsertAsync(treeRecordingTimeData);
@ -311,19 +325,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 _runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeRemarkData); await _dbProvider.InsertAsync(treeRemarkData);
// 数据帧
var treeFrameData = new TreeModelSingleMeasuringEntity<string>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}",
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
SingleMeasuring = new Tuple<string, string>(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty )
};
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeFrameData);
return await Task.FromResult(true); return await Task.FromResult(true);
} }

View File

@ -170,6 +170,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
// await _deviceRepository.UpdateAsync(entity); // await _deviceRepository.UpdateAsync(entity);
//} //}
var messageReceivedLoginEvent = new MessageReceivedLogin var messageReceivedLoginEvent = new MessageReceivedLogin
{ {
ClientId = code, ClientId = code,
@ -180,7 +181,8 @@ namespace JiShe.CollectBus.Protocol.T37612012
MessageId = Guid.NewGuid().ToString(), MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
}; };
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); //await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
var reqParam = new ReqParameter2 var reqParam = new ReqParameter2
{ {
AFN = AFN., AFN = AFN.,
@ -211,7 +213,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
{ {
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}"); _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage); // await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
} }
} }
@ -266,7 +268,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
MessageId = Guid.NewGuid().ToString(), MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
}; };
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); // await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
var reqParam = new ReqParameter2() var reqParam = new ReqParameter2()
{ {
@ -299,7 +301,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
{ {
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes); await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}"); _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage); // await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
} }

View File

@ -38,7 +38,7 @@ namespace JiShe.CollectBus.Subscribers
/// 电表自动阀控下行消息消费订阅 /// 电表自动阀控下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> issuedEventMessage);
#endregion #endregion
#region #region

View File

@ -62,21 +62,19 @@ namespace JiShe.CollectBus.Subscribers
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages) public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false; bool isAck = true;
foreach (var issuedEventMessage in issuedEventMessages) foreach (var issuedEventMessage in issuedEventMessages)
{ {
var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); //var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
if (loginEntity == null) //if (loginEntity == null)
{ //{
isAck=false; // isAck=false;
break; // break;
} //}
loginEntity.AckTime = Clock.Now; //loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true; //loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); //await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
isAck = true;
} }
// TODO:暂时ACK等后续处理是否放到私信队列中 // TODO:暂时ACK等后续处理是否放到私信队列中
return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
@ -85,19 +83,19 @@ namespace JiShe.CollectBus.Subscribers
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages) public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false; bool isAck = true;
foreach (var issuedEventMessage in issuedEventMessages) //foreach (var issuedEventMessage in issuedEventMessages)
{ //{
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); // var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
if (heartbeatEntity == null) // if (heartbeatEntity == null)
{ // {
isAck = false; // isAck = false;
break; // break;
} // }
heartbeatEntity.AckTime = Clock.Now; // heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true; // heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); // await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
} // }
// TODO:暂时ACK等后续处理是否放到私信队列中 // TODO:暂时ACK等后续处理是否放到私信队列中
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
} }

View File

@ -5,6 +5,7 @@ using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using TouchSocket.Sockets; using TouchSocket.Sockets;
@ -79,12 +80,14 @@ namespace JiShe.CollectBus.Subscribers
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName,EnableBatch =true,TaskCount=30,BatchSize =500)]
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> receivedMessage)
{ {
//todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。 //todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。
_logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); //_logger.LogInformation("电表自动阀控下行消息消费队列开始处理");
return await SendMessagesAsync(receivedMessage); _logger.LogWarning($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
return SubscribeAck.Success();
//return await SendMessagesAsync(receivedMessage);
} }
/// <summary> /// <summary>

View File

@ -16,6 +16,7 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" /> <link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" />
<title>后端服务</title> <title>后端服务</title>
</head> </head>
<body> <body>