Compare commits
No commits in common. "3790f3918e4bb9cc4511e1fb7ef1bf4875b17a31" and "27d3bad7fe10812b468d09885e2a6a9328894f28" have entirely different histories.
3790f3918e
...
27d3bad7fe
@ -39,8 +39,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}"
|
|
||||||
EndProject
|
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@ -115,10 +113,6 @@ Global
|
|||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
|
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
|
||||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
|
||||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
|
||||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU
|
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
@ -141,7 +135,6 @@ Global
|
|||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{FA762E8F-659A-DECF-83D6-5F364144450E} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
||||||
|
|||||||
@ -26,7 +26,6 @@
|
|||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -1,17 +1,16 @@
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Subscribers
|
namespace JiShe.CollectBus.Subscribers
|
||||||
{
|
{
|
||||||
public interface ISubscriberAppService : IApplicationService
|
public interface ISubscriberAppService : IApplicationService
|
||||||
{
|
{
|
||||||
Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
|
Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||||
Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
|
Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||||
Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage);
|
Task ReceivedEvent(MessageReceived receivedMessage);
|
||||||
Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
|
Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
|
||||||
Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
|
Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
@ -20,19 +19,19 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集电表数据下行消息消费订阅
|
/// 1分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据下行消息消费订阅
|
/// 5分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集电表数据下行消息消费订阅
|
/// 15分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region 水表消息采集
|
#region 水表消息采集
|
||||||
@ -40,7 +39,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集水表数据下行消息消费订阅
|
/// 1分钟采集水表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,13 +9,11 @@ using DotNetCore.CAP;
|
|||||||
using JiShe.CollectBus.Common.BuildSendDatas;
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.EnergySystem.Dto;
|
using JiShe.CollectBus.EnergySystem.Dto;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.IotSystems.PrepayModel;
|
using JiShe.CollectBus.IotSystems.PrepayModel;
|
||||||
using JiShe.CollectBus.IotSystems.Records;
|
using JiShe.CollectBus.IotSystems.Records;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
@ -30,16 +28,14 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
private readonly IRepository<FocusRecord, Guid> _focusRecordRepository;
|
private readonly IRepository<FocusRecord, Guid> _focusRecordRepository;
|
||||||
private readonly IRepository<CsqRecord,Guid> _csqRecordRepository;
|
private readonly IRepository<CsqRecord,Guid> _csqRecordRepository;
|
||||||
private readonly IRepository<ConrOnlineRecord,Guid> _conrOnlineRecordRepository;
|
private readonly IRepository<ConrOnlineRecord,Guid> _conrOnlineRecordRepository;
|
||||||
private readonly IProducerService _producerService;
|
|
||||||
private readonly ICapPublisher _capBus;
|
private readonly ICapPublisher _capBus;
|
||||||
|
|
||||||
public EnergySystemAppService(IRepository<FocusRecord, Guid> focusRecordRepository, IRepository<CsqRecord, Guid> csqRecordRepository,
|
public EnergySystemAppService(IRepository<FocusRecord, Guid> focusRecordRepository, IRepository<CsqRecord, Guid> csqRecordRepository,
|
||||||
IRepository<ConrOnlineRecord, Guid> conrOnlineRecordRepository, IProducerService producerService, ICapPublisher capBus)
|
IRepository<ConrOnlineRecord, Guid> conrOnlineRecordRepository, ICapPublisher capBus)
|
||||||
{
|
{
|
||||||
_focusRecordRepository = focusRecordRepository;
|
_focusRecordRepository = focusRecordRepository;
|
||||||
_csqRecordRepository = csqRecordRepository;
|
_csqRecordRepository = csqRecordRepository;
|
||||||
_conrOnlineRecordRepository = conrOnlineRecordRepository;
|
_conrOnlineRecordRepository = conrOnlineRecordRepository;
|
||||||
_producerService = producerService;
|
|
||||||
_capBus = capBus;
|
_capBus = capBus;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,16 +70,8 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
if (bytes == null)
|
if (bytes == null)
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
//{
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -120,16 +108,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
|
|
||||||
foreach (var bytes in bytesList)
|
foreach (var bytes in bytesList)
|
||||||
{
|
{
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -137,7 +116,6 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
Type = IssuedEventType.Data,
|
Type = IssuedEventType.Data,
|
||||||
MessageId = NewId.NextGuid().ToString()
|
MessageId = NewId.NextGuid().ToString()
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@ -171,15 +149,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
|
|
||||||
}).ToList();
|
}).ToList();
|
||||||
var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(address, meterParameters);
|
var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(address, meterParameters);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -208,16 +178,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
{
|
{
|
||||||
var dataUnit = Build645SendData.BuildReadMeterAddressSendDataUnit(detail.MeterAddress);
|
var dataUnit = Build645SendData.BuildReadMeterAddressSendDataUnit(detail.MeterAddress);
|
||||||
var bytes =Build3761SendData.BuildTransparentForwardingSendCmd(address, detail.Port, detail.BaudRate.ToString(), dataUnit, StopBit.Stop1, Parity.None);
|
var bytes =Build3761SendData.BuildTransparentForwardingSendCmd(address, detail.Port, detail.BaudRate.ToString(), dataUnit, StopBit.Stop1, Parity.None);
|
||||||
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -300,16 +261,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
|
|
||||||
if (bytes != null)
|
if (bytes != null)
|
||||||
{
|
{
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -368,16 +320,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
|
|
||||||
var bytes = Build3761SendData.BuildCommunicationParametersSetSendCmd(address, masterIP, materPort,
|
var bytes = Build3761SendData.BuildCommunicationParametersSetSendCmd(address, masterIP, materPort,
|
||||||
backupIP, backupPort, input.Data.APN);
|
backupIP, backupPort, input.Data.APN);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -404,16 +347,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
var address = $"{input.AreaCode}{input.Address}";
|
var address = $"{input.AreaCode}{input.Address}";
|
||||||
|
|
||||||
var bytes = Build3761SendData.BuildTerminalCalendarClockSendCmd(address);
|
var bytes = Build3761SendData.BuildTerminalCalendarClockSendCmd(address);
|
||||||
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -441,15 +375,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
bool isManual = !input.AreaCode.Equals("5110");//低功耗集中器不是长连接,在连接的那一刻再发送
|
bool isManual = !input.AreaCode.Equals("5110");//低功耗集中器不是长连接,在连接的那一刻再发送
|
||||||
|
|
||||||
var bytes = Build3761SendData.BuildConrCheckTimeSendCmd(address,DateTime.Now, isManual);
|
var bytes = Build3761SendData.BuildConrCheckTimeSendCmd(address,DateTime.Now, isManual);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -476,15 +402,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
var address = $"{input.AreaCode}{input.Address}";
|
var address = $"{input.AreaCode}{input.Address}";
|
||||||
|
|
||||||
var bytes = Build3761SendData.BuildConrRebootSendCmd(address);
|
var bytes = Build3761SendData.BuildConrRebootSendCmd(address);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -512,15 +430,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
var address = $"{input.AreaCode}{input.Address}";
|
var address = $"{input.AreaCode}{input.Address}";
|
||||||
var pnList = input.Data.Split(',').Select(it => int.Parse(it)).ToList();
|
var pnList = input.Data.Split(',').Select(it => int.Parse(it)).ToList();
|
||||||
var bytes = Build3761SendData.BuildAmmeterParameterReadingSendCmd(address, pnList);
|
var bytes = Build3761SendData.BuildAmmeterParameterReadingSendCmd(address, pnList);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -569,16 +479,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
|
|
||||||
foreach (var bytes in bytesList)
|
foreach (var bytes in bytesList)
|
||||||
{
|
{
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -586,7 +487,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
Type = IssuedEventType.Data,
|
Type = IssuedEventType.Data,
|
||||||
MessageId = NewId.NextGuid().ToString()
|
MessageId = NewId.NextGuid().ToString()
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
result.Status = true;
|
result.Status = true;
|
||||||
result.Msg = "操作成功";
|
result.Msg = "操作成功";
|
||||||
@ -647,15 +548,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
|
|
||||||
foreach (var bytes in bytesList)
|
foreach (var bytes in bytesList)
|
||||||
{
|
{
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -684,15 +577,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
var address = $"{code.AreaCode}{code.Address}";
|
var address = $"{code.AreaCode}{code.Address}";
|
||||||
var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(address,input.Detail.Pn, input.Detail.Unit,input.Detail.Cycle,input.Detail.BaseTime,
|
var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(address,input.Detail.Pn, input.Detail.Unit,input.Detail.Cycle,input.Detail.BaseTime,
|
||||||
input.Detail.CurveRatio,input.Detail.Details.Select(it => new PnFn(it.Pn,it.Fn)).ToList());
|
input.Detail.CurveRatio,input.Detail.Details.Select(it => new PnFn(it.Pn,it.Fn)).ToList());
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -720,15 +605,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
{
|
{
|
||||||
var address = $"{code.AreaCode}{code.Address}";
|
var address = $"{code.AreaCode}{code.Address}";
|
||||||
var bytes = Build3761SendData.BuildAmmeterAutoUpSwitchSetSendCmd(address, input.Detail.Pn,input.Detail.IsOpen);
|
var bytes = Build3761SendData.BuildAmmeterAutoUpSwitchSetSendCmd(address, input.Detail.Pn,input.Detail.IsOpen);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -754,15 +631,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
var result = new BaseResultDto();
|
var result = new BaseResultDto();
|
||||||
var address = $"{input.AreaCode}{input.Address}";
|
var address = $"{input.AreaCode}{input.Address}";
|
||||||
var bytes = Build3761SendData.BuildAmmeterReadAutoUpSwitchSendCmd(address, input.Detail.Pn);
|
var bytes = Build3761SendData.BuildAmmeterReadAutoUpSwitchSendCmd(address, input.Detail.Pn);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -789,15 +658,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
{
|
{
|
||||||
var address = $"{data.AreaCode}{data.Address}";
|
var address = $"{data.AreaCode}{data.Address}";
|
||||||
var bytes = Build3761SendData.BuildTerminalVersionInfoReadingSendCmd(address);
|
var bytes = Build3761SendData.BuildTerminalVersionInfoReadingSendCmd(address);
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
@ -852,15 +713,7 @@ namespace JiShe.CollectBus.EnergySystem
|
|||||||
|
|
||||||
foreach (var bytes in bytesList)
|
foreach (var bytes in bytesList)
|
||||||
{
|
{
|
||||||
//await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||||
//{
|
|
||||||
// //ClientId = messageReceived.ClientId,
|
|
||||||
// DeviceNo = address,
|
|
||||||
// Message = bytes,
|
|
||||||
// Type = IssuedEventType.Data,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
|
||||||
{
|
{
|
||||||
//ClientId = messageReceived.ClientId,
|
//ClientId = messageReceived.ClientId,
|
||||||
DeviceNo = address,
|
DeviceNo = address,
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
|
||||||
<PackageReference Include="MassTransit.Kafka" Version="8.4.0" />
|
<PackageReference Include="MassTransit.Kafka" Version="8.4.0" />
|
||||||
|
|
||||||
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
|
||||||
@ -24,6 +23,7 @@
|
|||||||
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
|
||||||
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
||||||
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
||||||
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
|
|||||||
@ -7,12 +7,10 @@ using DotNetCore.CAP;
|
|||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using JiShe.CollectBus.Enums;
|
using JiShe.CollectBus.Enums;
|
||||||
using JiShe.CollectBus.Interceptors;
|
using JiShe.CollectBus.Interceptors;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@ -29,7 +27,6 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
|
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
|
||||||
{
|
{
|
||||||
private readonly ICapPublisher _producerBus;
|
private readonly ICapPublisher _producerBus;
|
||||||
private readonly IProducerService _producerService;
|
|
||||||
private readonly ILogger<TcpMonitor> _logger;
|
private readonly ILogger<TcpMonitor> _logger;
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
|
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
|
||||||
@ -37,17 +34,16 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
///
|
///
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="producerService"></param>
|
/// <param name="producerBus"></param>
|
||||||
/// <param name="logger"></param>
|
/// <param name="logger"></param>
|
||||||
/// <param name="deviceRepository"></param>
|
/// <param name="deviceRepository"></param>
|
||||||
/// <param name="ammeterInfoCache"></param>
|
/// <param name="ammeterInfoCache"></param>
|
||||||
public TcpMonitor(ICapPublisher producerBus, IProducerService producerService,
|
public TcpMonitor(ICapPublisher producerBus,
|
||||||
ILogger<TcpMonitor> logger,
|
ILogger<TcpMonitor> logger,
|
||||||
IRepository<Device, Guid> deviceRepository,
|
IRepository<Device, Guid> deviceRepository,
|
||||||
IDistributedCache<AmmeterInfo> ammeterInfoCache)
|
IDistributedCache<AmmeterInfo> ammeterInfoCache)
|
||||||
{
|
{
|
||||||
_producerBus = producerBus;
|
_producerBus = producerBus;
|
||||||
_producerService = producerService;
|
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_deviceRepository = deviceRepository;
|
_deviceRepository = deviceRepository;
|
||||||
_ammeterInfoCache = ammeterInfoCache;
|
_ammeterInfoCache = ammeterInfoCache;
|
||||||
@ -174,11 +170,7 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
DeviceNo = deviceNo,
|
DeviceNo = deviceNo,
|
||||||
MessageId = NewId.NextGuid().ToString()
|
MessageId = NewId.NextGuid().ToString()
|
||||||
};
|
};
|
||||||
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
|
||||||
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
|
||||||
|
|
||||||
//await _producerBus.Publish( messageReceivedLoginEvent);
|
//await _producerBus.Publish( messageReceivedLoginEvent);
|
||||||
}
|
}
|
||||||
@ -225,9 +217,7 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
DeviceNo = deviceNo,
|
DeviceNo = deviceNo,
|
||||||
MessageId = NewId.NextGuid().ToString()
|
MessageId = NewId.NextGuid().ToString()
|
||||||
};
|
};
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
|
||||||
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
|
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,19 +241,11 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
// MessageId = NewId.NextGuid().ToString()
|
// MessageId = NewId.NextGuid().ToString()
|
||||||
//});
|
//});
|
||||||
|
|
||||||
|
|
||||||
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
|
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
|
||||||
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
|
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
|
||||||
//{
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
||||||
// ClientId = client.Id,
|
|
||||||
// ClientIp = client.IP,
|
|
||||||
// ClientPort = client.Port,
|
|
||||||
// MessageHexString = messageHexString,
|
|
||||||
// DeviceNo = deviceNo,
|
|
||||||
// MessageId = NewId.NextGuid().ToString()
|
|
||||||
//});
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
|
||||||
{
|
{
|
||||||
ClientId = client.Id,
|
ClientId = client.Id,
|
||||||
ClientIp = client.IP,
|
ClientIp = client.IP,
|
||||||
|
|||||||
@ -215,12 +215,11 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
return aa == null;
|
return aa == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe("test-topic1")]
|
[KafkaSubscribe(["test-topic"])]
|
||||||
|
|
||||||
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
|
public async Task<bool> KafkaSubscribeAsync(string obj)
|
||||||
{
|
{
|
||||||
_logger.LogWarning($"收到订阅消息: {obj}");
|
_logger.LogWarning($"收到订阅消息: {obj}");
|
||||||
return SubscribeAck.Success();
|
return await Task.FromResult(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -29,10 +29,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
|
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
|
||||||
{
|
{
|
||||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
||||||
|
private readonly ICapPublisher _producerBus;
|
||||||
private readonly IIoTDBProvider _dbProvider;
|
private readonly IIoTDBProvider _dbProvider;
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
|
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
|
||||||
private readonly IProducerService _producerService;
|
private readonly IProducerService _producerService;
|
||||||
private readonly ICapPublisher _producerBus;
|
|
||||||
|
|
||||||
public BasicScheduledMeterReadingService(
|
public BasicScheduledMeterReadingService(
|
||||||
ILogger<BasicScheduledMeterReadingService> logger,
|
ILogger<BasicScheduledMeterReadingService> logger,
|
||||||
@ -380,9 +381,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
|
||||||
//_= _producerBus.Publish(tempMsg);
|
//_= _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
|
|
||||||
@ -446,9 +445,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -512,9 +509,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -849,9 +845,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -1158,7 +1153,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
|
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
|
//await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|||||||
@ -36,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
string serverTagName = string.Empty;
|
string serverTagName = string.Empty;
|
||||||
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
|
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
|
||||||
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
|
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
|
||||||
{
|
{
|
||||||
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
|
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,8 +6,6 @@ using JiShe.CollectBus.IoTDBProvider;
|
|||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||||
@ -22,7 +20,7 @@ using Volo.Abp.Domain.Repositories;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Subscribers
|
namespace JiShe.CollectBus.Subscribers
|
||||||
{
|
{
|
||||||
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe, IKafkaSubscribe
|
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe
|
||||||
{
|
{
|
||||||
private readonly ILogger<SubscriberAppService> _logger;
|
private readonly ILogger<SubscriberAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
@ -65,11 +63,9 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
_dbProvider = dbProvider;
|
_dbProvider = dbProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
|
||||||
public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
|
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
|
||||||
switch (issuedEventMessage.Type)
|
switch (issuedEventMessage.Type)
|
||||||
{
|
{
|
||||||
case IssuedEventType.Heartbeat:
|
case IssuedEventType.Heartbeat:
|
||||||
@ -80,7 +76,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
loginEntity.AckTime = Clock.Now;
|
loginEntity.AckTime = Clock.Now;
|
||||||
loginEntity.IsAck = true;
|
loginEntity.IsAck = true;
|
||||||
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||||
isAck = true;
|
|
||||||
break;
|
break;
|
||||||
case IssuedEventType.Data:
|
case IssuedEventType.Data:
|
||||||
break;
|
break;
|
||||||
@ -95,14 +90,11 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//}
|
//}
|
||||||
|
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
|
||||||
public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
|
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
|
||||||
switch (issuedEventMessage.Type)
|
switch (issuedEventMessage.Type)
|
||||||
{
|
{
|
||||||
case IssuedEventType.Heartbeat:
|
case IssuedEventType.Heartbeat:
|
||||||
@ -111,7 +103,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
heartbeatEntity.AckTime = Clock.Now;
|
heartbeatEntity.AckTime = Clock.Now;
|
||||||
heartbeatEntity.IsAck = true;
|
heartbeatEntity.IsAck = true;
|
||||||
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||||
isAck = true;
|
|
||||||
break;
|
break;
|
||||||
case IssuedEventType.Data:
|
case IssuedEventType.Data:
|
||||||
break;
|
break;
|
||||||
@ -126,12 +117,10 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//}
|
//}
|
||||||
|
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
public async Task ReceivedEvent(MessageReceived receivedMessage)
|
||||||
public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
|
|
||||||
{
|
{
|
||||||
var currentTime = Clock.Now;
|
var currentTime = Clock.Now;
|
||||||
|
|
||||||
@ -148,13 +137,13 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
if(fN == null)
|
if(fN == null)
|
||||||
{
|
{
|
||||||
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
||||||
return SubscribeAck.Success();
|
return;
|
||||||
}
|
}
|
||||||
var tb3761FN = fN.FnList.FirstOrDefault();
|
var tb3761FN = fN.FnList.FirstOrDefault();
|
||||||
if (tb3761FN == null)
|
if (tb3761FN == null)
|
||||||
{
|
{
|
||||||
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
||||||
return SubscribeAck.Success();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//报文入库
|
//报文入库
|
||||||
@ -180,15 +169,11 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//todo 查找是否有下发任务
|
//todo 查找是否有下发任务
|
||||||
|
|
||||||
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
|
||||||
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
|
|
||||||
{
|
{
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
if (protocolPlugin == null)
|
if (protocolPlugin == null)
|
||||||
@ -200,12 +185,10 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
|
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
|
||||||
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
|
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
|
||||||
public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
|
|
||||||
{
|
{
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
if (protocolPlugin == null)
|
if (protocolPlugin == null)
|
||||||
@ -217,7 +200,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
await protocolPlugin.LoginAsync(receivedLoginMessage);
|
await protocolPlugin.LoginAsync(receivedLoginMessage);
|
||||||
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
|
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,8 +8,6 @@ using JiShe.CollectBus.IotSystems.Devices;
|
|||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
@ -26,7 +24,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 定时抄读任务消息消费订阅
|
/// 定时抄读任务消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Route($"/worker/app/subscriber")]
|
[Route($"/worker/app/subscriber")]
|
||||||
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, ICapSubscribe, IKafkaSubscribe
|
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe
|
||||||
{
|
{
|
||||||
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
@ -65,9 +63,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/oneminute/issued-event")]
|
[Route("ammeter/oneminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -84,7 +81,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -94,9 +90,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/fiveminute/issued-event")]
|
[Route("ammeter/fiveminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -113,7 +108,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -123,9 +117,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/fifteenminute/issued-event")]
|
[Route("ammeter/fifteenminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
||||||
try
|
try
|
||||||
@ -144,7 +137,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@ -163,9 +155,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("watermeter/fifteenminute/issued-event")]
|
[Route("watermeter/fifteenminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -181,7 +172,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -83,7 +83,6 @@
|
|||||||
},
|
},
|
||||||
"Kafka": {
|
"Kafka": {
|
||||||
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
|
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
|
||||||
"EnableFilter": true,
|
|
||||||
"EnableAuthorization": false,
|
"EnableAuthorization": false,
|
||||||
"SecurityProtocol": "SASL_PLAINTEXT",
|
"SecurityProtocol": "SASL_PLAINTEXT",
|
||||||
"SaslMechanism": "PLAIN",
|
"SaslMechanism": "PLAIN",
|
||||||
|
|||||||
@ -183,19 +183,6 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
return partitions.Any(p => p.PartitionId == targetPartition);
|
return partitions.Any(p => p.PartitionId == targetPartition);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取主题的分区数量
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public int GetTopicPartitionsNum(string topic)
|
|
||||||
{
|
|
||||||
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
|
|
||||||
if (metadata.Topics.Count == 0)
|
|
||||||
return 0;
|
|
||||||
return metadata.Topics[0].Partitions.Count;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
Instance?.Dispose();
|
Instance?.Dispose();
|
||||||
|
|||||||
@ -52,12 +52,5 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
/// <param name="targetPartition"></param>
|
/// <param name="targetPartition"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
bool CheckPartitionsExist(string topic, int targetPartition);
|
bool CheckPartitionsExist(string topic, int targetPartition);
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取主题的分区数量
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="topic"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
int GetTopicPartitionsNum(string topic);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 订阅的主题
|
/// 订阅的主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string Topic { get; set; }
|
public string[] Topics { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 分区
|
/// 分区
|
||||||
@ -24,53 +24,30 @@ namespace JiShe.CollectBus.Kafka.Attributes
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public string GroupId { get; set; }
|
public string GroupId { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
public KafkaSubscribeAttribute(string[] topics, string groupId = "default")
|
||||||
/// 任务数(默认是多少个分区多少个任务)
|
|
||||||
/// 如设置订阅指定Partition则任务数始终为1
|
|
||||||
/// </summary>
|
|
||||||
public int TaskCount { get; set; } = -1;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量处理数量
|
|
||||||
/// </summary>
|
|
||||||
public int BatchSize { get; set; } = 100;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 是否启用批量处理
|
|
||||||
/// </summary>
|
|
||||||
public bool EnableBatch { get; set; } = false;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批次超时时间
|
|
||||||
/// </summary>
|
|
||||||
public TimeSpan? BatchTimeout { get; set; }=null;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 订阅主题
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
|
|
||||||
public KafkaSubscribeAttribute(string topic, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
|
|
||||||
{
|
{
|
||||||
this.Topic = topic;
|
this.Topics = topics;
|
||||||
this.GroupId = groupId;
|
this.GroupId = groupId;
|
||||||
this.EnableBatch = enableBatch;
|
|
||||||
this.BatchSize = batchSize;
|
|
||||||
this.BatchTimeout = batchTimeout != null? TimeSpan.Parse(batchTimeout): null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
public KafkaSubscribeAttribute(string topic, string groupId = "default")
|
||||||
/// 订阅主题
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="batchTimeout">batchTimeout格式:("00:05:00")</param>
|
|
||||||
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default", bool enableBatch = false, int batchSize = 100, string? batchTimeout = null)
|
|
||||||
{
|
{
|
||||||
this.Topic = topic;
|
this.Topics = new string[] { topic };
|
||||||
|
this.GroupId = groupId;
|
||||||
|
}
|
||||||
|
public KafkaSubscribeAttribute(string[] topics, int partition, string groupId = "default")
|
||||||
|
{
|
||||||
|
this.Topics = topics;
|
||||||
|
this.Partition = partition;
|
||||||
|
this.GroupId = groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default")
|
||||||
|
{
|
||||||
|
this.Topics = new string[] { topic };
|
||||||
this.Partition = partition;
|
this.Partition = partition;
|
||||||
this.GroupId = groupId;
|
this.GroupId = groupId;
|
||||||
this.TaskCount = 1;
|
|
||||||
this.EnableBatch = enableBatch;
|
|
||||||
this.BatchSize = batchSize;
|
|
||||||
this.BatchTimeout = batchTimeout != null ? TimeSpan.Parse(batchTimeout) : null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,9 +17,9 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||||
{
|
{
|
||||||
// 注册Producer
|
// 注册Producer
|
||||||
context.Services.AddSingleton<IProducerService, ProducerService>();
|
context.Services.AddTransient<IProducerService, ProducerService>();
|
||||||
// 注册Consumer
|
// 注册Consumer
|
||||||
context.Services.AddSingleton<IConsumerService, ConsumerService>();
|
context.Services.AddTransient<IConsumerService, ConsumerService>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
||||||
|
|||||||
@ -9,9 +9,6 @@ using static Confluent.Kafka.ConfigPropertyNames;
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Text.RegularExpressions;
|
using System.Text.RegularExpressions;
|
||||||
using NUglify.Html;
|
using NUglify.Html;
|
||||||
using Serilog;
|
|
||||||
using System;
|
|
||||||
using System.Text;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
namespace JiShe.CollectBus.Kafka.Consumer
|
||||||
{
|
{
|
||||||
@ -21,7 +18,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
private readonly IConfiguration _configuration;
|
private readonly IConfiguration _configuration;
|
||||||
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
|
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
|
||||||
_consumerStore = new();
|
_consumerStore = new();
|
||||||
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
|
|
||||||
|
|
||||||
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
|
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
|
||||||
{
|
{
|
||||||
@ -41,8 +37,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
{
|
{
|
||||||
var config = BuildConsumerConfig(groupId);
|
var config = BuildConsumerConfig(groupId);
|
||||||
return new ConsumerBuilder<TKey, TValue>(config)
|
return new ConsumerBuilder<TKey, TValue>(config)
|
||||||
.SetValueDeserializer(new JsonSerializer<TValue>())
|
|
||||||
.SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}"))
|
|
||||||
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
|
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
|
||||||
.Build();
|
.Build();
|
||||||
}
|
}
|
||||||
@ -56,10 +50,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
BootstrapServers = _configuration["Kafka:BootstrapServers"],
|
BootstrapServers = _configuration["Kafka:BootstrapServers"],
|
||||||
GroupId = groupId ?? "default",
|
GroupId = groupId ?? "default",
|
||||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||||
EnableAutoCommit = false, // 禁止AutoCommit
|
EnableAutoCommit = false // 禁止AutoCommit
|
||||||
EnablePartitionEof = true, // 启用分区末尾标记
|
|
||||||
AllowAutoCreateTopics= true, // 启用自动创建
|
|
||||||
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (enableAuth)
|
if (enableAuth)
|
||||||
@ -110,7 +101,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
|
var consumerKey = typeof((TKey, TValue));
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
@ -128,29 +119,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
var result = consumer.Consume(cts.Token);
|
var result = consumer.Consume(cts.Token);
|
||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
|
||||||
//consumer.Commit(result); // 手动提交
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (result.IsPartitionEOF)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
|
||||||
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
|
|
||||||
{
|
|
||||||
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
|
|
||||||
// 检查 Header 是否符合条件
|
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
|
||||||
{
|
|
||||||
//consumer.Commit(result); // 提交偏移量
|
|
||||||
// 跳过消息
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
|
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
|
||||||
if (sucess)
|
if (sucess)
|
||||||
{
|
{
|
||||||
@ -178,14 +146,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
var consumerKey = typeof((Null, TValue));
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
||||||
(
|
(
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
CreateConsumer<Null, TValue>(groupId),
|
||||||
cts
|
cts
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
)).Consumer as IConsumer<Null, TValue>;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
@ -196,29 +164,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
var result = consumer.Consume(cts.Token);
|
var result = consumer.Consume(cts.Token);
|
||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
|
||||||
//consumer.Commit(result); // 手动提交
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (result.IsPartitionEOF)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
|
||||||
await Task.Delay(100, cts.Token);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
|
|
||||||
{
|
|
||||||
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
|
|
||||||
// 检查 Header 是否符合条件
|
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
|
||||||
{
|
|
||||||
//consumer.Commit(result); // 提交偏移量
|
|
||||||
// 跳过消息
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bool sucess = await messageHandler(result.Message.Value);
|
bool sucess = await messageHandler(result.Message.Value);
|
||||||
if (sucess)
|
if (sucess)
|
||||||
consumer.Commit(result); // 手动提交
|
consumer.Commit(result); // 手动提交
|
||||||
@ -232,272 +177,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
await Task.CompletedTask;
|
await Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey">消息Key类型</typeparam>
|
|
||||||
/// <typeparam name="TValue">消息Value类型</typeparam>
|
|
||||||
/// <param name="topic">主题</param>
|
|
||||||
/// <param name="messageBatchHandler">批量消息处理函数</param>
|
|
||||||
/// <param name="groupId">消费组ID</param>
|
|
||||||
/// <param name="batchSize">批次大小</param>
|
|
||||||
/// <param name="batchTimeout">批次超时时间</param>
|
|
||||||
public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
|
|
||||||
{
|
|
||||||
await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TKey">消息Key类型</typeparam>
|
|
||||||
/// <typeparam name="TValue">消息Value类型</typeparam>
|
|
||||||
/// <param name="topics">主题列表</param>
|
|
||||||
/// <param name="messageBatchHandler">批量消息处理函数</param>
|
|
||||||
/// <param name="groupId">消费组ID</param>
|
|
||||||
/// <param name="batchSize">批次大小</param>
|
|
||||||
/// <param name="batchTimeout">批次超时时间</param>
|
|
||||||
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
|
|
||||||
{
|
|
||||||
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
|
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
|
||||||
CreateConsumer<TKey, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<TKey, TValue>;
|
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
|
||||||
|
|
||||||
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
|
||||||
{
|
|
||||||
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
|
|
||||||
var startTime = DateTime.UtcNow;
|
|
||||||
|
|
||||||
while (!cts.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// 非阻塞快速累积消息
|
|
||||||
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
|
|
||||||
{
|
|
||||||
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
|
|
||||||
|
|
||||||
if (result != null)
|
|
||||||
{
|
|
||||||
if (result.IsPartitionEOF)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
|
||||||
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
|
|
||||||
}
|
|
||||||
else if (result.Message.Value != null)
|
|
||||||
{
|
|
||||||
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
|
|
||||||
{
|
|
||||||
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
|
|
||||||
// 检查 Header 是否符合条件
|
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
|
||||||
{
|
|
||||||
//consumer.Commit(result); // 提交偏移量
|
|
||||||
// 跳过消息
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
messages.Add((result.Message.Value, result.TopicPartitionOffset));
|
|
||||||
//messages.Add(result.Message.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// 无消息时短暂等待
|
|
||||||
await Task.Delay(10, cts.Token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理批次
|
|
||||||
if (messages.Count > 0)
|
|
||||||
{
|
|
||||||
bool success = await messageBatchHandler(messages.Select(m => m.Value));
|
|
||||||
if (success)
|
|
||||||
{
|
|
||||||
var offsetsByPartition = new Dictionary<TopicPartition, long>();
|
|
||||||
foreach (var msg in messages)
|
|
||||||
{
|
|
||||||
var tp = msg.Offset.TopicPartition;
|
|
||||||
var offset = msg.Offset.Offset;
|
|
||||||
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
|
|
||||||
{
|
|
||||||
offsetsByPartition[tp] = offset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var offsetsToCommit = offsetsByPartition
|
|
||||||
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
|
|
||||||
.ToList();
|
|
||||||
consumer.Commit(offsetsToCommit);
|
|
||||||
}
|
|
||||||
messages.Clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
startTime = DateTime.UtcNow;
|
|
||||||
}
|
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
|
||||||
}
|
|
||||||
catch (OperationCanceledException)
|
|
||||||
{
|
|
||||||
// 任务取消,正常退出
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, cts.Token);
|
|
||||||
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TValue">消息Value类型</typeparam>
|
|
||||||
/// <param name="topic">主题列表</param>
|
|
||||||
/// <param name="messageBatchHandler">批量消息处理函数</param>
|
|
||||||
/// <param name="groupId">消费组ID</param>
|
|
||||||
/// <param name="batchSize">批次大小</param>
|
|
||||||
/// <param name="batchTimeout">批次超时时间</param>
|
|
||||||
/// <param name="consumeTimeout">消费等待时间</param>
|
|
||||||
public async Task SubscribeBatchAsync<TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
|
|
||||||
{
|
|
||||||
await SubscribeBatchAsync<TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 批量订阅消息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="TValue">消息Value类型</typeparam>
|
|
||||||
/// <param name="topics">主题列表</param>
|
|
||||||
/// <param name="messageBatchHandler">批量消息处理函数</param>
|
|
||||||
/// <param name="groupId">消费组ID</param>
|
|
||||||
/// <param name="batchSize">批次大小</param>
|
|
||||||
/// <param name="batchTimeout">批次超时时间</param>
|
|
||||||
/// <param name="consumeTimeout">消费等待时间</param>
|
|
||||||
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
|
||||||
{
|
|
||||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
|
||||||
|
|
||||||
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
|
||||||
{
|
|
||||||
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
|
|
||||||
//var messages = new List<ConsumeResult<TKey, TValue>>();
|
|
||||||
var startTime = DateTime.UtcNow;
|
|
||||||
|
|
||||||
while (!cts.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// 非阻塞快速累积消息
|
|
||||||
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
|
|
||||||
{
|
|
||||||
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
|
|
||||||
|
|
||||||
if (result != null)
|
|
||||||
{
|
|
||||||
if (result.IsPartitionEOF)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
|
||||||
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
|
|
||||||
}
|
|
||||||
else if (result.Message.Value != null)
|
|
||||||
{
|
|
||||||
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
|
|
||||||
{
|
|
||||||
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
|
|
||||||
// 检查 Header 是否符合条件
|
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
|
||||||
{
|
|
||||||
//consumer.Commit(result); // 提交偏移量
|
|
||||||
// 跳过消息
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
messages.Add((result.Message.Value, result.TopicPartitionOffset));
|
|
||||||
//messages.Add(result.Message.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// 无消息时短暂等待
|
|
||||||
await Task.Delay(10, cts.Token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理批次
|
|
||||||
if (messages.Count > 0)
|
|
||||||
{
|
|
||||||
bool success = await messageBatchHandler(messages.Select(m => m.Value));
|
|
||||||
if (success)
|
|
||||||
{
|
|
||||||
var offsetsByPartition = new Dictionary<TopicPartition, long>();
|
|
||||||
foreach (var msg in messages)
|
|
||||||
{
|
|
||||||
var tp = msg.Offset.TopicPartition;
|
|
||||||
var offset = msg.Offset.Offset;
|
|
||||||
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
|
|
||||||
{
|
|
||||||
offsetsByPartition[tp] = offset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var offsetsToCommit = offsetsByPartition
|
|
||||||
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
|
|
||||||
.ToList();
|
|
||||||
consumer.Commit(offsetsToCommit);
|
|
||||||
}
|
|
||||||
messages.Clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
startTime = DateTime.UtcNow;
|
|
||||||
}
|
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
|
||||||
}
|
|
||||||
catch (OperationCanceledException)
|
|
||||||
{
|
|
||||||
// 任务取消,正常退出
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, cts.Token);
|
|
||||||
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 取消消息订阅
|
/// 取消消息订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
using Confluent.Kafka;
|
using System;
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
@ -33,14 +32,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
|
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
|
||||||
|
|
||||||
Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
|
|
||||||
|
|
||||||
Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
|
|
||||||
|
|
||||||
Task SubscribeBatchAsync<TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
|
|
||||||
|
|
||||||
Task SubscribeBatchAsync<TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
|
|
||||||
|
|
||||||
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
|
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,30 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 消息头过滤器
|
|
||||||
/// </summary>
|
|
||||||
public class HeadersFilter : Dictionary<string, byte[]>
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 判断Headers是否匹配
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="headers"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public bool Match(Headers headers)
|
|
||||||
{
|
|
||||||
foreach (var kvp in this)
|
|
||||||
{
|
|
||||||
if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value))
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,21 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
|
||||||
{
|
|
||||||
public interface ISubscribeAck
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 是否成功标记
|
|
||||||
/// </summary>
|
|
||||||
bool Ack { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消息
|
|
||||||
/// </summary>
|
|
||||||
string? Msg { get; set; }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,88 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using System.Text.Json;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
using System.Text.Json.Serialization;
|
|
||||||
using System.Text.Encodings.Web;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// JSON 序列化器(支持泛型)
|
|
||||||
/// </summary>
|
|
||||||
public class JsonSerializer<T> : ISerializer<T>, IDeserializer<T>
|
|
||||||
{
|
|
||||||
private static readonly JsonSerializerOptions _options = new JsonSerializerOptions
|
|
||||||
{
|
|
||||||
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
|
||||||
WriteIndented = false,// 设置格式化输出
|
|
||||||
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,// 允许特殊字符
|
|
||||||
IgnoreReadOnlyFields = true,
|
|
||||||
IgnoreReadOnlyProperties = true,
|
|
||||||
NumberHandling = JsonNumberHandling.AllowReadingFromString, // 允许数字字符串
|
|
||||||
AllowTrailingCommas = true, // 忽略尾随逗号
|
|
||||||
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
|
|
||||||
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
|
|
||||||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
|
|
||||||
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
|
|
||||||
};
|
|
||||||
|
|
||||||
public byte[] Serialize(T data, SerializationContext context)
|
|
||||||
{
|
|
||||||
if (data == null)
|
|
||||||
return null;
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return JsonSerializer.SerializeToUtf8Bytes(data, _options);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
throw new InvalidOperationException("Kafka序列化失败", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
|
|
||||||
{
|
|
||||||
if (isNull)
|
|
||||||
return default;
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return JsonSerializer.Deserialize<T>(data, _options);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
throw new InvalidOperationException("Kafka反序列化失败", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public class DateTimeJsonConverter : JsonConverter<DateTime>
|
|
||||||
{
|
|
||||||
private readonly string _dateFormatString;
|
|
||||||
public DateTimeJsonConverter()
|
|
||||||
{
|
|
||||||
_dateFormatString = "yyyy-MM-dd HH:mm:ss";
|
|
||||||
}
|
|
||||||
|
|
||||||
public DateTimeJsonConverter(string dateFormatString)
|
|
||||||
{
|
|
||||||
_dateFormatString = dateFormatString;
|
|
||||||
}
|
|
||||||
|
|
||||||
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
|
|
||||||
{
|
|
||||||
return DateTime.Parse(reader.GetString());
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
|
|
||||||
{
|
|
||||||
writer.WriteStringValue(value.ToString(_dateFormatString));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,8 +1,4 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using DeviceDetectorNET;
|
|
||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using JiShe.CollectBus.Kafka.AdminClient;
|
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
using JiShe.CollectBus.Kafka.Attributes;
|
||||||
using JiShe.CollectBus.Kafka.Consumer;
|
using JiShe.CollectBus.Kafka.Consumer;
|
||||||
using Microsoft.AspNetCore.Builder;
|
using Microsoft.AspNetCore.Builder;
|
||||||
@ -12,7 +8,6 @@ using Microsoft.Extensions.Logging;
|
|||||||
using Microsoft.Extensions.Primitives;
|
using Microsoft.Extensions.Primitives;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
@ -42,9 +37,6 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
|
|
||||||
lifetime.ApplicationStarted.Register(() =>
|
lifetime.ApplicationStarted.Register(() =>
|
||||||
{
|
{
|
||||||
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
|
||||||
int threadCount = 0;
|
|
||||||
int topicCount = 0;
|
|
||||||
foreach (var subscribeType in subscribeTypes)
|
foreach (var subscribeType in subscribeTypes)
|
||||||
{
|
{
|
||||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||||
@ -52,13 +44,10 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
|
|
||||||
if(subscribe is IKafkaSubscribe)
|
if(subscribe is IKafkaSubscribe)
|
||||||
{
|
{
|
||||||
Tuple<int, int> tuple= BuildKafkaSubscriber(subscribe, provider, logger);
|
BuildKafkaSubscriber(subscribe, provider);
|
||||||
threadCount+= tuple.Item1;
|
|
||||||
topicCount+= tuple.Item2;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,27 +56,16 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="subscribe"></param>
|
/// <param name="subscribe"></param>
|
||||||
/// <param name="provider"></param>
|
/// <param name="provider"></param>
|
||||||
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger)
|
private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider)
|
||||||
{
|
{
|
||||||
var subscribedMethods = subscribe.GetType().GetMethods()
|
var subscribedMethods = subscribe.GetType().GetMethods()
|
||||||
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
||||||
.Where(x => x.Attribute != null)
|
.Where(x => x.Attribute != null)
|
||||||
.ToArray();
|
.ToArray();
|
||||||
|
|
||||||
int threadCount = 0;
|
|
||||||
foreach (var sub in subscribedMethods)
|
foreach (var sub in subscribedMethods)
|
||||||
{
|
{
|
||||||
var adminClientService = provider.GetRequiredService<IAdminClientService>();
|
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe));
|
||||||
int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
|
|
||||||
if (partitionCount <= 0)
|
|
||||||
partitionCount = 1;
|
|
||||||
for (int i = 0; i < partitionCount; i++)
|
|
||||||
{
|
|
||||||
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
|
|
||||||
threadCount++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return Tuple.Create(threadCount, subscribedMethods.Length);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -98,11 +76,11 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// <param name="method"></param>
|
/// <param name="method"></param>
|
||||||
/// <param name="consumerInstance"></param>
|
/// <param name="consumerInstance"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
|
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe)
|
||||||
{
|
{
|
||||||
var consumerService = provider.GetRequiredService<IConsumerService>();
|
var consumerService = provider.GetRequiredService<IConsumerService>();
|
||||||
|
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
||||||
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
|
await consumerService.SubscribeAsync<string>(attr.Topics, async (message) =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -118,7 +96,6 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 处理消息
|
/// 处理消息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -126,34 +103,28 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// <param name="method"></param>
|
/// <param name="method"></param>
|
||||||
/// <param name="subscribe"></param>
|
/// <param name="subscribe"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
|
private static async Task<bool> ProcessMessageAsync(string message, MethodInfo method, object subscribe)
|
||||||
{
|
{
|
||||||
var parameters = method.GetParameters();
|
var parameters = method.GetParameters();
|
||||||
bool isGenericTask = method.ReturnType.IsGenericType
|
if (parameters.Length != 1)
|
||||||
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
|
return true;
|
||||||
bool existParameters = parameters.Length > 0;
|
|
||||||
//dynamic? messageObj= null;
|
var paramType = parameters[0].ParameterType;
|
||||||
//if (existParameters)
|
var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType);
|
||||||
//{
|
|
||||||
// var paramType = parameters[0].ParameterType;
|
if (method.ReturnType == typeof(Task))
|
||||||
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
|
|
||||||
//}
|
|
||||||
if (isGenericTask)
|
|
||||||
{
|
{
|
||||||
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
|
object? result = await (Task<bool>)method.Invoke(subscribe, new[] { messageObj })!;
|
||||||
if (result is ISubscribeAck ackResult)
|
if (result is bool success)
|
||||||
{
|
return success;
|
||||||
return ackResult.Ack;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
|
object? result = method.Invoke(subscribe, new[] { messageObj });
|
||||||
if (result is ISubscribeAck ackResult)
|
if (result is bool success)
|
||||||
{
|
return success;
|
||||||
return ackResult.Ack;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
|
|
||||||
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
|
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
|
||||||
|
|
||||||
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class;
|
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,7 +9,6 @@ using JiShe.CollectBus.Kafka.Consumer;
|
|||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
using YamlDotNet.Serialization;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Producer
|
namespace JiShe.CollectBus.Kafka.Producer
|
||||||
{
|
{
|
||||||
@ -17,8 +16,7 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
{
|
{
|
||||||
private readonly ILogger<ProducerService> _logger;
|
private readonly ILogger<ProducerService> _logger;
|
||||||
private readonly IConfiguration _configuration;
|
private readonly IConfiguration _configuration;
|
||||||
private readonly ConcurrentDictionary<Type, object> _producerCache = new();
|
private readonly ConcurrentDictionary<Tuple<Type, Type>, object> _producerCache = new();
|
||||||
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
|
|
||||||
|
|
||||||
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger)
|
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger)
|
||||||
{
|
{
|
||||||
@ -33,13 +31,14 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
/// <typeparam name="TKey"></typeparam>
|
/// <typeparam name="TKey"></typeparam>
|
||||||
/// <typeparam name="TValue"></typeparam>
|
/// <typeparam name="TValue"></typeparam>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private IProducer<TKey, TValue> GetProducer<TKey, TValue>(Type typeKey)
|
private IProducer<TKey, TValue> GetProducer<TKey, TValue>()
|
||||||
{
|
{
|
||||||
|
var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!;
|
||||||
|
|
||||||
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
|
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
|
||||||
{
|
{
|
||||||
var config = BuildProducerConfig();
|
var config = BuildProducerConfig();
|
||||||
return new ProducerBuilder<TKey, TValue>(config)
|
return new ProducerBuilder<TKey, TValue>(config)
|
||||||
.SetValueSerializer(new JsonSerializer<TValue>()) // Value 使用自定义 JSON 序列化
|
|
||||||
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
|
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
|
||||||
.Build();
|
.Build();
|
||||||
});
|
});
|
||||||
@ -63,7 +62,6 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
LingerMs = 20, // 修改等待时间为20ms
|
LingerMs = 20, // 修改等待时间为20ms
|
||||||
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
|
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
|
||||||
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
|
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
|
||||||
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (enableAuth)
|
if (enableAuth)
|
||||||
@ -103,17 +101,8 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
|
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var typeKey = typeof(KafkaProducer<TKey, TValue>);
|
var producer = GetProducer<TKey, TValue>();
|
||||||
var producer = GetProducer<TKey, TValue>(typeKey);
|
await producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });
|
||||||
var message = new Message<TKey, TValue>
|
|
||||||
{
|
|
||||||
Key = key,
|
|
||||||
Value = value,
|
|
||||||
Headers = new Headers{
|
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
|
|
||||||
}
|
|
||||||
};
|
|
||||||
await producer.ProduceAsync(topic, message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -125,16 +114,8 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
||||||
{
|
{
|
||||||
var typeKey = typeof(KafkaProducer<string, TValue>);
|
var producer = GetProducer<Null, TValue>();
|
||||||
var producer = GetProducer<string, TValue>(typeKey);
|
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value });
|
||||||
var message = new Message<string, TValue>
|
|
||||||
{
|
|
||||||
Value = value,
|
|
||||||
Headers = new Headers{
|
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
|
|
||||||
}
|
|
||||||
};
|
|
||||||
await producer.ProduceAsync(topic, message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -153,13 +134,9 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
var message = new Message<TKey, TValue>
|
var message = new Message<TKey, TValue>
|
||||||
{
|
{
|
||||||
Key = key,
|
Key = key,
|
||||||
Value = value,
|
Value = value
|
||||||
Headers = new Headers{
|
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
var typeKey = typeof(KafkaProducer<TKey, TValue>);
|
var producer = GetProducer<TKey, TValue>();
|
||||||
var producer = GetProducer<TKey, TValue>(typeKey);
|
|
||||||
if (partition.HasValue)
|
if (partition.HasValue)
|
||||||
{
|
{
|
||||||
var topicPartition = new TopicPartition(topic, partition.Value);
|
var topicPartition = new TopicPartition(topic, partition.Value);
|
||||||
@ -183,17 +160,13 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
/// <param name="partition"></param>
|
/// <param name="partition"></param>
|
||||||
/// <param name="deliveryHandler"></param>
|
/// <param name="deliveryHandler"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class
|
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
|
||||||
{
|
{
|
||||||
var message = new Message<string, TValue>
|
var message = new Message<Null, TValue>
|
||||||
{
|
{
|
||||||
Value = value,
|
Value = value
|
||||||
Headers = new Headers{
|
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
var typeKey = typeof(KafkaProducer<string, TValue>);
|
var producer = GetProducer<Null, TValue>();
|
||||||
var producer = GetProducer<string, TValue>(typeKey);
|
|
||||||
if (partition.HasValue)
|
if (partition.HasValue)
|
||||||
{
|
{
|
||||||
var topicPartition = new TopicPartition(topic, partition.Value);
|
var topicPartition = new TopicPartition(topic, partition.Value);
|
||||||
|
|||||||
@ -1,75 +0,0 @@
|
|||||||
using Confluent.Kafka;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using static System.Runtime.InteropServices.JavaScript.JSType;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
|
||||||
{
|
|
||||||
public class SubscribeResult: ISubscribeAck
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 是否成功
|
|
||||||
/// </summary>
|
|
||||||
public bool Ack { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消息
|
|
||||||
/// </summary>
|
|
||||||
public string? Msg { get; set; }
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 成功
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="msg">消息</param>
|
|
||||||
public SubscribeResult Success(string? msg = null)
|
|
||||||
{
|
|
||||||
Ack = true;
|
|
||||||
Msg = msg;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 失败
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="code"></param>
|
|
||||||
/// <param name="msg"></param>
|
|
||||||
/// <param name="data"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public SubscribeResult Fail(string? msg = null)
|
|
||||||
{
|
|
||||||
Msg = msg;
|
|
||||||
Ack = false;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static partial class SubscribeAck
|
|
||||||
{
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 成功
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="msg">消息</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static ISubscribeAck Success(string? msg = null)
|
|
||||||
{
|
|
||||||
return new SubscribeResult().Success(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 失败
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="msg">消息</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static ISubscribeAck Fail(string? msg = null)
|
|
||||||
{
|
|
||||||
return new SubscribeResult().Fail(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,4 +1,5 @@
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
using DotNetCore.CAP;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
@ -11,16 +12,12 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.Protocols;
|
using JiShe.CollectBus.IotSystems.Protocols;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using DotNetCore.CAP;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||||
{
|
{
|
||||||
public abstract class BaseProtocolPlugin : IProtocolPlugin
|
public abstract class BaseProtocolPlugin : IProtocolPlugin
|
||||||
{
|
{
|
||||||
private readonly ICapPublisher _producerBus;
|
private readonly ICapPublisher _producerBus;
|
||||||
private readonly IProducerService _producerService;
|
|
||||||
private readonly ILogger<BaseProtocolPlugin> _logger;
|
private readonly ILogger<BaseProtocolPlugin> _logger;
|
||||||
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
|
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
|
||||||
|
|
||||||
@ -40,8 +37,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
|
|
||||||
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
|
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
|
||||||
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
|
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
|
||||||
_producerService = serviceProvider.GetRequiredService<IProducerService>();
|
_producerBus = serviceProvider.GetRequiredService<ICapPublisher>();
|
||||||
_producerBus = serviceProvider.GetRequiredService<ICapPublisher>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract ProtocolInfo Info { get; }
|
public abstract ProtocolInfo Info { get; }
|
||||||
@ -90,9 +86,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
Fn = 1
|
Fn = 1
|
||||||
};
|
};
|
||||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,9 +126,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
Fn = 1
|
Fn = 1
|
||||||
};
|
};
|
||||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
|
||||||
|
|
||||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,7 +7,6 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
|
||||||
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
|
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
|
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
|
||||||
@ -18,7 +17,6 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user