Compare commits

...

9 Commits

Author SHA1 Message Date
3790f3918e 添加Header消息过滤开关 2025-04-16 20:41:52 +08:00
78f9ef349a 优化kafka
增加默认序列化和反序列化
增加批量消费
增加Header消费过滤
2025-04-16 18:46:51 +08:00
eed68d0fe0 kafka增加批量消费 2025-04-16 18:26:25 +08:00
0d4c780727 保留CAP组件,后续决定是否移除 2025-04-16 09:54:21 +08:00
e1106b3241 移除CAP组件 2025-04-15 19:09:16 +08:00
11d3fcf162 移除CAP组件 2025-04-15 18:58:38 +08:00
a8f79c56f4 Merge branch 'dev' into zhy_feat_dev_v1 2025-04-15 18:06:32 +08:00
72d1b9f623 优化订阅回调 2025-04-15 18:03:51 +08:00
7b1f04a452 暂存 2025-04-15 16:45:10 +08:00
28 changed files with 999 additions and 136 deletions

View File

@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -113,6 +115,10 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -135,6 +141,7 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{FA762E8F-659A-DECF-83D6-5F364144450E} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -26,6 +26,7 @@
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup>
</Project>

View File

@ -1,16 +1,17 @@
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka;
using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers
{
public interface ISubscriberAppService : IApplicationService
{
Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
Task ReceivedEvent(MessageReceived receivedMessage);
Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage);
Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
}
}

View File

@ -1,6 +1,7 @@
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp.Application.Services;
@ -19,19 +20,19 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
#region
@ -39,7 +40,7 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
}

View File

@ -9,11 +9,13 @@ using DotNetCore.CAP;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.EnergySystem.Dto;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.IotSystems.Records;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
@ -28,14 +30,16 @@ namespace JiShe.CollectBus.EnergySystem
private readonly IRepository<FocusRecord, Guid> _focusRecordRepository;
private readonly IRepository<CsqRecord,Guid> _csqRecordRepository;
private readonly IRepository<ConrOnlineRecord,Guid> _conrOnlineRecordRepository;
private readonly IProducerService _producerService;
private readonly ICapPublisher _capBus;
public EnergySystemAppService(IRepository<FocusRecord, Guid> focusRecordRepository, IRepository<CsqRecord, Guid> csqRecordRepository,
IRepository<ConrOnlineRecord, Guid> conrOnlineRecordRepository, ICapPublisher capBus)
IRepository<ConrOnlineRecord, Guid> conrOnlineRecordRepository, IProducerService producerService, ICapPublisher capBus)
{
_focusRecordRepository = focusRecordRepository;
_csqRecordRepository = csqRecordRepository;
_conrOnlineRecordRepository = conrOnlineRecordRepository;
_producerService = producerService;
_capBus = capBus;
}
@ -70,8 +74,16 @@ namespace JiShe.CollectBus.EnergySystem
if (bytes == null)
return result;
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -108,7 +120,16 @@ namespace JiShe.CollectBus.EnergySystem
foreach (var bytes in bytesList)
{
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -116,6 +137,7 @@ namespace JiShe.CollectBus.EnergySystem
Type = IssuedEventType.Data,
MessageId = NewId.NextGuid().ToString()
});
}
return result;
@ -149,7 +171,15 @@ namespace JiShe.CollectBus.EnergySystem
}).ToList();
var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(address, meterParameters);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -178,7 +208,16 @@ namespace JiShe.CollectBus.EnergySystem
{
var dataUnit = Build645SendData.BuildReadMeterAddressSendDataUnit(detail.MeterAddress);
var bytes =Build3761SendData.BuildTransparentForwardingSendCmd(address, detail.Port, detail.BaudRate.ToString(), dataUnit, StopBit.Stop1, Parity.None);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -261,7 +300,16 @@ namespace JiShe.CollectBus.EnergySystem
if (bytes != null)
{
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -320,7 +368,16 @@ namespace JiShe.CollectBus.EnergySystem
var bytes = Build3761SendData.BuildCommunicationParametersSetSendCmd(address, masterIP, materPort,
backupIP, backupPort, input.Data.APN);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -347,7 +404,16 @@ namespace JiShe.CollectBus.EnergySystem
var address = $"{input.AreaCode}{input.Address}";
var bytes = Build3761SendData.BuildTerminalCalendarClockSendCmd(address);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -375,7 +441,15 @@ namespace JiShe.CollectBus.EnergySystem
bool isManual = !input.AreaCode.Equals("5110");//低功耗集中器不是长连接,在连接的那一刻再发送
var bytes = Build3761SendData.BuildConrCheckTimeSendCmd(address,DateTime.Now, isManual);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -402,7 +476,15 @@ namespace JiShe.CollectBus.EnergySystem
var address = $"{input.AreaCode}{input.Address}";
var bytes = Build3761SendData.BuildConrRebootSendCmd(address);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -430,7 +512,15 @@ namespace JiShe.CollectBus.EnergySystem
var address = $"{input.AreaCode}{input.Address}";
var pnList = input.Data.Split(',').Select(it => int.Parse(it)).ToList();
var bytes = Build3761SendData.BuildAmmeterParameterReadingSendCmd(address, pnList);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -479,7 +569,16 @@ namespace JiShe.CollectBus.EnergySystem
foreach (var bytes in bytesList)
{
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -548,7 +647,15 @@ namespace JiShe.CollectBus.EnergySystem
foreach (var bytes in bytesList)
{
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -577,7 +684,15 @@ namespace JiShe.CollectBus.EnergySystem
var address = $"{code.AreaCode}{code.Address}";
var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(address,input.Detail.Pn, input.Detail.Unit,input.Detail.Cycle,input.Detail.BaseTime,
input.Detail.CurveRatio,input.Detail.Details.Select(it => new PnFn(it.Pn,it.Fn)).ToList());
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -605,7 +720,15 @@ namespace JiShe.CollectBus.EnergySystem
{
var address = $"{code.AreaCode}{code.Address}";
var bytes = Build3761SendData.BuildAmmeterAutoUpSwitchSetSendCmd(address, input.Detail.Pn,input.Detail.IsOpen);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -631,7 +754,15 @@ namespace JiShe.CollectBus.EnergySystem
var result = new BaseResultDto();
var address = $"{input.AreaCode}{input.Address}";
var bytes = Build3761SendData.BuildAmmeterReadAutoUpSwitchSendCmd(address, input.Detail.Pn);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -658,7 +789,15 @@ namespace JiShe.CollectBus.EnergySystem
{
var address = $"{data.AreaCode}{data.Address}";
var bytes = Build3761SendData.BuildTerminalVersionInfoReadingSendCmd(address);
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,
@ -713,7 +852,15 @@ namespace JiShe.CollectBus.EnergySystem
foreach (var bytes in bytesList)
{
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
//{
// //ClientId = messageReceived.ClientId,
// DeviceNo = address,
// Message = bytes,
// Type = IssuedEventType.Data,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
{
//ClientId = messageReceived.ClientId,
DeviceNo = address,

View File

@ -15,6 +15,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="MassTransit.Kafka" Version="8.4.0" />
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
@ -23,7 +24,6 @@
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
<PackageReference Include="TouchSocket" Version="3.0.19" />
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />

View File

@ -7,10 +7,12 @@ using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using MassTransit;
using Microsoft.Extensions.Logging;
@ -27,6 +29,7 @@ namespace JiShe.CollectBus.Plugins
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
private readonly ICapPublisher _producerBus;
private readonly IProducerService _producerService;
private readonly ILogger<TcpMonitor> _logger;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
@ -34,16 +37,17 @@ namespace JiShe.CollectBus.Plugins
/// <summary>
///
/// </summary>
/// <param name="producerBus"></param>
/// <param name="producerService"></param>
/// <param name="logger"></param>
/// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param>
public TcpMonitor(ICapPublisher producerBus,
public TcpMonitor(ICapPublisher producerBus, IProducerService producerService,
ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache)
{
_producerBus = producerBus;
_producerService = producerService;
_logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache;
@ -170,7 +174,11 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
//await _producerBus.Publish( messageReceivedLoginEvent);
}
@ -217,7 +225,9 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
@ -244,8 +254,16 @@ namespace JiShe.CollectBus.Plugins
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
ClientId = client.Id,
ClientIp = client.IP,

View File

@ -215,11 +215,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
return aa == null;
}
[KafkaSubscribe(["test-topic"])]
[KafkaSubscribe("test-topic1")]
public async Task<bool> KafkaSubscribeAsync(string obj)
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{
_logger.LogWarning($"收到订阅消息: {obj}");
return await Task.FromResult(true);
return SubscribeAck.Success();
}
}

View File

@ -29,11 +29,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _producerBus;
private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService;
private readonly ICapPublisher _producerBus;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
@ -381,7 +380,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//_= _producerBus.Publish(tempMsg);
@ -445,7 +446,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@ -509,8 +512,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@ -845,8 +849,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@ -1153,6 +1158,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);

View File

@ -6,6 +6,8 @@ using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
@ -20,7 +22,7 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe, IKafkaSubscribe
{
private readonly ILogger<SubscriberAppService> _logger;
private readonly ITcpService _tcpService;
@ -63,9 +65,11 @@ namespace JiShe.CollectBus.Subscribers
_dbProvider = dbProvider;
}
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
{
bool isAck = false;
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@ -76,6 +80,7 @@ namespace JiShe.CollectBus.Subscribers
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
isAck = true;
break;
case IssuedEventType.Data:
break;
@ -90,11 +95,14 @@ namespace JiShe.CollectBus.Subscribers
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
}
[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
{
bool isAck = false;
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@ -103,6 +111,7 @@ namespace JiShe.CollectBus.Subscribers
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
isAck = true;
break;
case IssuedEventType.Data:
break;
@ -117,10 +126,12 @@ namespace JiShe.CollectBus.Subscribers
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
}
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task ReceivedEvent(MessageReceived receivedMessage)
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
{
var currentTime = Clock.Now;
@ -137,13 +148,13 @@ namespace JiShe.CollectBus.Subscribers
if(fN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return;
return SubscribeAck.Success();
}
var tb3761FN = fN.FnList.FirstOrDefault();
if (tb3761FN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return;
return SubscribeAck.Success();
}
//报文入库
@ -169,11 +180,15 @@ namespace JiShe.CollectBus.Subscribers
//todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
return SubscribeAck.Success();
}
[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
@ -185,10 +200,12 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
}
return SubscribeAck.Success();
}
[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
@ -200,6 +217,7 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.LoginAsync(receivedLoginMessage);
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
}
return SubscribeAck.Success();
}
}
}

View File

@ -8,6 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord;
@ -24,7 +26,7 @@ namespace JiShe.CollectBus.Subscribers
/// 定时抄读任务消息消费订阅
/// </summary>
[Route($"/worker/app/subscriber")]
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, ICapSubscribe, IKafkaSubscribe
{
private readonly ILogger<WorkerSubscriberAppService> _logger;
private readonly ITcpService _tcpService;
@ -63,8 +65,9 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("ammeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -81,6 +84,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
return SubscribeAck.Success();
}
/// <summary>
@ -90,8 +94,9 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("ammeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -108,6 +113,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
return SubscribeAck.Success();
}
/// <summary>
@ -117,8 +123,9 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("ammeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
try
@ -137,6 +144,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
return SubscribeAck.Success();
}
catch (Exception ex)
{
@ -155,8 +163,9 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
//[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -172,6 +181,7 @@ namespace JiShe.CollectBus.Subscribers
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
return SubscribeAck.Success();
}
#endregion
}

View File

@ -83,6 +83,7 @@
},
"Kafka": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"EnableFilter": true,
"EnableAuthorization": false,
"SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN",

View File

@ -183,6 +183,19 @@ namespace JiShe.CollectBus.Kafka.AdminClient
return partitions.Any(p => p.PartitionId == targetPartition);
}
/// <summary>
/// 获取主题的分区数量
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public int GetTopicPartitionsNum(string topic)
{
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
if (metadata.Topics.Count == 0)
return 0;
return metadata.Topics[0].Partitions.Count;
}
public void Dispose()
{
Instance?.Dispose();

View File

@ -52,5 +52,12 @@ namespace JiShe.CollectBus.Kafka.AdminClient
/// <param name="targetPartition"></param>
/// <returns></returns>
bool CheckPartitionsExist(string topic, int targetPartition);
/// <summary>
/// 获取主题的分区数量
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
int GetTopicPartitionsNum(string topic);
}
}

View File

@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary>
/// 订阅的主题
/// </summary>
public string[] Topics { get; set; }
public string Topic { get; set; }
/// <summary>
/// 分区
@ -24,30 +24,53 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// </summary>
public string GroupId { get; set; }
public KafkaSubscribeAttribute(string[] topics, string groupId = "default")
/// <summary>
/// 任务数(默认是多少个分区多少个任务)
/// 如设置订阅指定Partition则任务数始终为1
/// </summary>
public int TaskCount { get; set; } = -1;
/// <summary>
/// 批量处理数量
/// </summary>
public int BatchSize { get; set; } = 100;
/// <summary>
/// 是否启用批量处理
/// </summary>
public bool EnableBatch { get; set; } = false;
/// <summary>
/// 批次超时时间
/// </summary>
public TimeSpan? BatchTimeout { get; set; }=null;
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
{
this.Topics = topics;
this.Topic = topic;
this.GroupId = groupId;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
}
public KafkaSubscribeAttribute(string topic, string groupId = "default")
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
{
this.Topics = new string[] { topic };
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string[] topics, int partition, string groupId = "default")
{
this.Topics = topics;
this.Partition = partition;
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default")
{
this.Topics = new string[] { topic };
this.Topic = topic;
this.Partition = partition;
this.GroupId = groupId;
this.TaskCount = 1;
this.EnableBatch = enableBatch;
this.BatchSize = batchSize;
this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
}
}
}

View File

@ -17,9 +17,9 @@ namespace JiShe.CollectBus.Kafka
public override void ConfigureServices(ServiceConfigurationContext context)
{
// 注册Producer
context.Services.AddTransient<IProducerService, ProducerService>();
context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer
context.Services.AddTransient<IConsumerService, ConsumerService>();
context.Services.AddSingleton<IConsumerService, ConsumerService>();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)

View File

@ -9,6 +9,9 @@ using static Confluent.Kafka.ConfigPropertyNames;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using NUglify.Html;
using Serilog;
using System;
using System.Text;
namespace JiShe.CollectBus.Kafka.Consumer
{
@ -18,6 +21,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
{
@ -37,6 +41,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
var config = BuildConsumerConfig(groupId);
return new ConsumerBuilder<TKey, TValue>(config)
.SetValueDeserializer(new JsonSerializer<TValue>())
.SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}"))
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
.Build();
}
@ -50,7 +56,10 @@ namespace JiShe.CollectBus.Kafka.Consumer
BootstrapServers = _configuration["Kafka:BootstrapServers"],
GroupId = groupId ?? "default",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // 禁止AutoCommit
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
AllowAutoCreateTopics= true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
};
if (enableAuth)
@ -101,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof((TKey, TValue));
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
@ -119,6 +128,29 @@ namespace JiShe.CollectBus.Kafka.Consumer
try
{
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
{
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
//consumer.Commit(result); // 手动提交
continue;
}
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
continue;
}
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
{
@ -146,14 +178,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
var consumerKey = typeof((Null, TValue));
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
(
CreateConsumer<Null, TValue>(groupId),
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Null, TValue>;
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
@ -164,6 +196,29 @@ namespace JiShe.CollectBus.Kafka.Consumer
try
{
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
{
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
//consumer.Commit(result); // 手动提交
continue;
}
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(100, cts.Token);
continue;
}
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
@ -177,6 +232,272 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.CompletedTask;
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TKey">消息Key类型</typeparam>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topic">主题</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TKey">消息Key类型</typeparam>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topics">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
}
else if (result.Message.Value != null)
{
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value));
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topic">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
await SubscribeBatchAsync<TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topics">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
{
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
//var messages = new List<ConsumeResult<TKey, TValue>>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
}
else if (result.Message.Value != null)
{
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value));
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
}
/// <summary>
/// 取消消息订阅
/// </summary>

View File

@ -1,4 +1,5 @@
using System;
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -32,6 +33,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
Task SubscribeBatchAsync<TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
Task SubscribeBatchAsync<TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
}
}

View File

@ -0,0 +1,30 @@
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
/// <summary>
/// 消息头过滤器
/// </summary>
public class HeadersFilter : Dictionary<string, byte[]>
{
/// <summary>
/// 判断Headers是否匹配
/// </summary>
/// <param name="headers"></param>
/// <returns></returns>
public bool Match(Headers headers)
{
foreach (var kvp in this)
{
if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value))
return false;
}
return true;
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public interface ISubscribeAck
{
/// <summary>
/// 是否成功标记
/// </summary>
bool Ack { get; set; }
/// <summary>
/// 消息
/// </summary>
string? Msg { get; set; }
}
}

View File

@ -0,0 +1,88 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Text.Json;
using Confluent.Kafka;
using System.Text.Json.Serialization;
using System.Text.Encodings.Web;
namespace JiShe.CollectBus.Kafka
{
/// <summary>
/// JSON 序列化器(支持泛型)
/// </summary>
public class JsonSerializer<T> : ISerializer<T>, IDeserializer<T>
{
private static readonly JsonSerializerOptions _options = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,// 设置格式化输出
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true,
NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
AllowTrailingCommas = true, // 忽略尾随逗号
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
};
public byte[] Serialize(T data, SerializationContext context)
{
if (data == null)
return null;
try
{
return JsonSerializer.SerializeToUtf8Bytes(data, _options);
}
catch (Exception ex)
{
throw new InvalidOperationException("Kafka序列化失败", ex);
}
}
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull)
return default;
try
{
return JsonSerializer.Deserialize<T>(data, _options);
}
catch (Exception ex)
{
throw new InvalidOperationException("Kafka反序列化失败", ex);
}
}
}
public class DateTimeJsonConverter : JsonConverter<DateTime>
{
private readonly string _dateFormatString;
public DateTimeJsonConverter()
{
_dateFormatString = "yyyy-MM-dd HH:mm:ss";
}
public DateTimeJsonConverter(string dateFormatString)
{
_dateFormatString = dateFormatString;
}
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
return DateTime.Parse(reader.GetString());
}
public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
{
writer.WriteStringValue(value.ToString(_dateFormatString));
}
}
}

View File

@ -1,4 +1,8 @@
using Confluent.Kafka;
using DeviceDetectorNET;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.AspNetCore.Builder;
@ -8,6 +12,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
@ -37,6 +42,9 @@ namespace JiShe.CollectBus.Kafka
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
@ -44,10 +52,13 @@ namespace JiShe.CollectBus.Kafka
if(subscribe is IKafkaSubscribe)
{
BuildKafkaSubscriber(subscribe, provider);
Tuple<int, int> tuple= BuildKafkaSubscriber(subscribe, provider, logger);
threadCount+= tuple.Item1;
topicCount+= tuple.Item2;
}
});
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
@ -56,17 +67,28 @@ namespace JiShe.CollectBus.Kafka
/// </summary>
/// <param name="subscribe"></param>
/// <param name="provider"></param>
private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider)
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger)
{
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null)
.ToArray();
int threadCount = 0;
foreach (var sub in subscribedMethods)
{
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe));
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
for (int i = 0; i < partitionCount; i++)
{
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
threadCount++;
}
}
return Tuple.Create(threadCount, subscribedMethods.Length);
}
/// <summary>
/// 启动后台消费线程
@ -76,11 +98,11 @@ namespace JiShe.CollectBus.Kafka
/// <param name="method"></param>
/// <param name="consumerInstance"></param>
/// <returns></returns>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe)
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
{
var consumerService = provider.GetRequiredService<IConsumerService>();
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
await consumerService.SubscribeAsync<string>(attr.Topics, async (message) =>
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
{
try
{
@ -96,6 +118,7 @@ namespace JiShe.CollectBus.Kafka
});
}
/// <summary>
/// 处理消息
/// </summary>
@ -103,28 +126,34 @@ namespace JiShe.CollectBus.Kafka
/// <param name="method"></param>
/// <param name="subscribe"></param>
/// <returns></returns>
private static async Task<bool> ProcessMessageAsync(string message, MethodInfo method, object subscribe)
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
{
var parameters = method.GetParameters();
if (parameters.Length != 1)
return true;
var paramType = parameters[0].ParameterType;
var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType);
if (method.ReturnType == typeof(Task))
bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0;
//dynamic? messageObj= null;
//if (existParameters)
//{
// var paramType = parameters[0].ParameterType;
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
//}
if (isGenericTask)
{
object? result = await (Task<bool>)method.Invoke(subscribe, new[] { messageObj })!;
if (result is bool success)
return success;
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
}
else
{
object? result = method.Invoke(subscribe, new[] { messageObj });
if (result is bool success)
return success;
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
}
return false;
}

View File

@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class;
}
}

View File

@ -9,6 +9,7 @@ using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
using YamlDotNet.Serialization;
namespace JiShe.CollectBus.Kafka.Producer
{
@ -16,7 +17,8 @@ namespace JiShe.CollectBus.Kafka.Producer
{
private readonly ILogger<ProducerService> _logger;
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Tuple<Type, Type>, object> _producerCache = new();
private readonly ConcurrentDictionary<Type, object> _producerCache = new();
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger)
{
@ -31,14 +33,13 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
private IProducer<TKey, TValue> GetProducer<TKey, TValue>()
private IProducer<TKey, TValue> GetProducer<TKey, TValue>(Type typeKey)
{
var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!;
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
{
var config = BuildProducerConfig();
return new ProducerBuilder<TKey, TValue>(config)
.SetValueSerializer(new JsonSerializer<TValue>()) // Value 使用自定义 JSON 序列化
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
.Build();
});
@ -62,6 +63,7 @@ namespace JiShe.CollectBus.Kafka.Producer
LingerMs = 20, // 修改等待时间为20ms
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs
};
if (enableAuth)
@ -101,8 +103,17 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
{
var producer = GetProducer<TKey, TValue>();
await producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });
var typeKey = typeof(KafkaProducer<TKey, TValue>);
var producer = GetProducer<TKey, TValue>(typeKey);
var message = new Message<TKey, TValue>
{
Key = key,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
await producer.ProduceAsync(topic, message);
}
/// <summary>
@ -114,8 +125,16 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{
var producer = GetProducer<Null, TValue>();
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value });
var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey);
var message = new Message<string, TValue>
{
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
await producer.ProduceAsync(topic, message);
}
/// <summary>
@ -134,9 +153,13 @@ namespace JiShe.CollectBus.Kafka.Producer
var message = new Message<TKey, TValue>
{
Key = key,
Value = value
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
var producer = GetProducer<TKey, TValue>();
var typeKey = typeof(KafkaProducer<TKey, TValue>);
var producer = GetProducer<TKey, TValue>(typeKey);
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
@ -160,13 +183,17 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class
{
var message = new Message<Null, TValue>
var message = new Message<string, TValue>
{
Value = value
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
var producer = GetProducer<Null, TValue>();
var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey);
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);

View File

@ -0,0 +1,75 @@
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace JiShe.CollectBus.Kafka
{
public class SubscribeResult: ISubscribeAck
{
/// <summary>
/// 是否成功
/// </summary>
public bool Ack { get; set; }
/// <summary>
/// 消息
/// </summary>
public string? Msg { get; set; }
/// <summary>
/// 成功
/// </summary>
/// <param name="msg">消息</param>
public SubscribeResult Success(string? msg = null)
{
Ack = true;
Msg = msg;
return this;
}
/// <summary>
/// 失败
/// </summary>
/// <param name="code"></param>
/// <param name="msg"></param>
/// <param name="data"></param>
/// <returns></returns>
public SubscribeResult Fail(string? msg = null)
{
Msg = msg;
Ack = false;
return this;
}
}
public static partial class SubscribeAck
{
/// <summary>
/// 成功
/// </summary>
/// <param name="msg">消息</param>
/// <returns></returns>
public static ISubscribeAck Success(string? msg = null)
{
return new SubscribeResult().Success(msg);
}
/// <summary>
/// 失败
/// </summary>
/// <param name="msg">消息</param>
/// <returns></returns>
public static ISubscribeAck Fail(string? msg = null)
{
return new SubscribeResult().Fail(msg);
}
}
}

View File

@ -1,5 +1,4 @@
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
@ -12,12 +11,16 @@ using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Common.Helpers;
using DotNetCore.CAP;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
public abstract class BaseProtocolPlugin : IProtocolPlugin
{
private readonly ICapPublisher _producerBus;
private readonly IProducerService _producerService;
private readonly ILogger<BaseProtocolPlugin> _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
@ -37,6 +40,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_producerService = serviceProvider.GetRequiredService<IProducerService>();
_producerBus = serviceProvider.GetRequiredService<ICapPublisher>();
}
@ -86,8 +90,9 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
}
@ -126,7 +131,9 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
}

View File

@ -7,6 +7,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
@ -17,6 +18,7 @@
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup>
</Project>