移除CAP组件

This commit is contained in:
zenghongyao 2025-04-15 18:58:38 +08:00
parent a8f79c56f4
commit 11d3fcf162
17 changed files with 89 additions and 74 deletions

View File

@ -26,6 +26,7 @@
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,16 +1,17 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers namespace JiShe.CollectBus.Subscribers
{ {
public interface ISubscriberAppService : IApplicationService public interface ISubscriberAppService : IApplicationService
{ {
Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage); Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage); Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
Task ReceivedEvent(MessageReceived receivedMessage); Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage);
Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage); Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage); Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
} }
} }

View File

@ -1,6 +1,7 @@
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
@ -19,19 +20,19 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集电表数据下行消息消费订阅 /// 1分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary> /// <summary>
/// 5分钟采集电表数据下行消息消费订阅 /// 5分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary> /// <summary>
/// 15分钟采集电表数据下行消息消费订阅 /// 15分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion #endregion
#region #region
@ -39,7 +40,7 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集水表数据下行消息消费订阅 /// 1分钟采集水表数据下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion #endregion
} }

View File

@ -23,7 +23,6 @@
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
<PackageReference Include="TouchSocket" Version="3.0.19" /> <PackageReference Include="TouchSocket" Version="3.0.19" />
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" /> <PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" /> <PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />

View File

@ -7,10 +7,12 @@ using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums; using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using MassTransit; using MassTransit;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -26,7 +28,7 @@ namespace JiShe.CollectBus.Plugins
{ {
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{ {
private readonly ICapPublisher _producerBus; private readonly IProducerService _producerService;
private readonly ILogger<TcpMonitor> _logger; private readonly ILogger<TcpMonitor> _logger;
private readonly IRepository<Device, Guid> _deviceRepository; private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache; private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
@ -34,16 +36,16 @@ namespace JiShe.CollectBus.Plugins
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
/// <param name="producerBus"></param> /// <param name="producerService"></param>
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="deviceRepository"></param> /// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param> /// <param name="ammeterInfoCache"></param>
public TcpMonitor(ICapPublisher producerBus, public TcpMonitor(IProducerService producerService,
ILogger<TcpMonitor> logger, ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository, IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache) IDistributedCache<AmmeterInfo> ammeterInfoCache)
{ {
_producerBus = producerBus; _producerService = producerService;
_logger = logger; _logger = logger;
_deviceRepository = deviceRepository; _deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache; _ammeterInfoCache = ammeterInfoCache;
@ -170,7 +172,7 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo, DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString() MessageId = NewId.NextGuid().ToString()
}; };
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize());
//await _producerBus.Publish( messageReceivedLoginEvent); //await _producerBus.Publish( messageReceivedLoginEvent);
} }
@ -217,7 +219,7 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo, DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString() MessageId = NewId.NextGuid().ToString()
}; };
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize());
//await _producerBus.Publish(messageReceivedHeartbeatEvent); //await _producerBus.Publish(messageReceivedHeartbeatEvent);
} }
@ -245,7 +247,7 @@ namespace JiShe.CollectBus.Plugins
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算? //todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{ {
ClientId = client.Id, ClientId = client.Id,
ClientIp = client.IP, ClientIp = client.IP,
@ -253,7 +255,7 @@ namespace JiShe.CollectBus.Plugins
MessageHexString = messageHexString, MessageHexString = messageHexString,
DeviceNo = deviceNo, DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString() MessageId = NewId.NextGuid().ToString()
}); }.Serialize());
} }
} }
} }

View File

@ -215,19 +215,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
return aa == null; return aa == null;
} }
[KafkaSubscribe(["test-topic1"])] [KafkaSubscribe(["test-topic"])]
public async Task<ISubscribeAck> KafkaSubscribeAsync() // TestSubscribe obj public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{ {
var obj=string.Empty;
_logger.LogWarning($"收到订阅消息: {obj}"); _logger.LogWarning($"收到订阅消息: {obj}");
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
} }
public class TestSubscribe
{
public string Topic { get; set; }
public int Val { get; set; }
}

View File

@ -29,7 +29,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{ {
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _producerBus;
private readonly IIoTDBProvider _dbProvider; private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
@ -37,12 +36,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger, ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository, IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService, IProducerService producerService,
IIoTDBProvider dbProvider) IIoTDBProvider dbProvider)
{ {
_producerBus = producerBus;
_logger = logger; _logger = logger;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository; _meterReadingRecordRepository = meterReadingRecordRepository;
@ -381,7 +378,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress, FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(), TimeDensity = timeDensity.ToString(),
}; };
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize());
//_= _producerBus.Publish(tempMsg); //_= _producerBus.Publish(tempMsg);
@ -445,7 +442,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress, FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(), TimeDensity = timeDensity.ToString(),
}; };
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg); //_ = _producerBus.Publish(tempMsg);
@ -510,7 +507,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(), TimeDensity = timeDensity.ToString(),
}; };
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg); //_ = _producerBus.Publish(tempMsg);
@ -805,7 +802,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
await _producerService.ProduceAsync(topicName, partition, taskRecord); await _producerService.ProduceAsync(topicName, partition, taskRecord.Serialize());
} }
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
@ -846,7 +843,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(), TimeDensity = timeDensity.ToString(),
}; };
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg); //_ = _producerBus.Publish(tempMsg);

View File

@ -36,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
string serverTagName = string.Empty; string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, meterReadingRecordRepository, producerService,dbProvider)
{ {
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!; serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
} }

View File

@ -6,6 +6,8 @@ using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.Protocol.Contracts.Models;
@ -20,7 +22,7 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers namespace JiShe.CollectBus.Subscribers
{ {
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe
{ {
private readonly ILogger<SubscriberAppService> _logger; private readonly ILogger<SubscriberAppService> _logger;
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
@ -63,9 +65,10 @@ namespace JiShe.CollectBus.Subscribers
_dbProvider = dbProvider; _dbProvider = dbProvider;
} }
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
{ {
bool isAck = false;
switch (issuedEventMessage.Type) switch (issuedEventMessage.Type)
{ {
case IssuedEventType.Heartbeat: case IssuedEventType.Heartbeat:
@ -76,6 +79,7 @@ namespace JiShe.CollectBus.Subscribers
loginEntity.AckTime = Clock.Now; loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true; loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
isAck = true;
break; break;
case IssuedEventType.Data: case IssuedEventType.Data:
break; break;
@ -90,11 +94,13 @@ namespace JiShe.CollectBus.Subscribers
//} //}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
} }
[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
{ {
bool isAck = false;
switch (issuedEventMessage.Type) switch (issuedEventMessage.Type)
{ {
case IssuedEventType.Heartbeat: case IssuedEventType.Heartbeat:
@ -103,6 +109,7 @@ namespace JiShe.CollectBus.Subscribers
heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true; heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
isAck = true;
break; break;
case IssuedEventType.Data: case IssuedEventType.Data:
break; break;
@ -117,10 +124,11 @@ namespace JiShe.CollectBus.Subscribers
//} //}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
} }
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task ReceivedEvent(MessageReceived receivedMessage) public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
{ {
var currentTime = Clock.Now; var currentTime = Clock.Now;
@ -137,13 +145,13 @@ namespace JiShe.CollectBus.Subscribers
if(fN == null) if(fN == null)
{ {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return; return SubscribeAck.Success();
} }
var tb3761FN = fN.FnList.FirstOrDefault(); var tb3761FN = fN.FnList.FirstOrDefault();
if (tb3761FN == null) if (tb3761FN == null)
{ {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return; return SubscribeAck.Success();
} }
//报文入库 //报文入库
@ -169,11 +177,14 @@ namespace JiShe.CollectBus.Subscribers
//todo 查找是否有下发任务 //todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage); //await _messageReceivedEventRepository.InsertAsync(receivedMessage);
} }
return SubscribeAck.Success();
} }
[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
{ {
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
@ -185,10 +196,11 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
} }
return SubscribeAck.Success();
} }
[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
{ {
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
@ -200,6 +212,7 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.LoginAsync(receivedLoginMessage); await protocolPlugin.LoginAsync(receivedLoginMessage);
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
} }
return SubscribeAck.Success();
} }
} }
} }

View File

@ -8,6 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
@ -24,7 +26,7 @@ namespace JiShe.CollectBus.Subscribers
/// 定时抄读任务消息消费订阅 /// 定时抄读任务消息消费订阅
/// </summary> /// </summary>
[Route($"/worker/app/subscriber")] [Route($"/worker/app/subscriber")]
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
{ {
private readonly ILogger<WorkerSubscriberAppService> _logger; private readonly ILogger<WorkerSubscriberAppService> _logger;
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
@ -63,8 +65,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("ammeter/oneminute/issued-event")] [Route("ammeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -81,6 +83,7 @@ namespace JiShe.CollectBus.Subscribers
} }
} }
return SubscribeAck.Success();
} }
/// <summary> /// <summary>
@ -90,8 +93,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("ammeter/fiveminute/issued-event")] [Route("ammeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -108,6 +111,7 @@ namespace JiShe.CollectBus.Subscribers
} }
} }
return SubscribeAck.Success();
} }
/// <summary> /// <summary>
@ -117,8 +121,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("ammeter/fifteenminute/issued-event")] [Route("ammeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
try try
@ -137,6 +141,7 @@ namespace JiShe.CollectBus.Subscribers
} }
} }
return SubscribeAck.Success();
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -155,8 +160,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("watermeter/fifteenminute/issued-event")] [Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理"); _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -172,6 +177,7 @@ namespace JiShe.CollectBus.Subscribers
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
} }
} }
return SubscribeAck.Success();
} }
#endregion #endregion
} }

View File

@ -43,7 +43,7 @@ namespace JiShe.CollectBus.Host
ConfigureNetwork(context, configuration); ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration); ConfigureJwtAuthentication(context, configuration);
ConfigureHangfire(context); ConfigureHangfire(context);
ConfigureCap(context, configuration); //ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration); //ConfigureMassTransit(context, configuration);
//ConfigureKafkaTopic(context, configuration); //ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context); ConfigureAuditLog(context);

View File

@ -16,7 +16,6 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0"/> <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/> <link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
<title>后端服务</title> <title>后端服务</title>
</head> </head>
<body> <body>

View File

@ -146,14 +146,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns> /// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{ {
var consumerKey = typeof((Null, TValue)); var consumerKey = typeof((Ignore, TValue));
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _=> var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
( (
CreateConsumer<Null, TValue>(groupId), CreateConsumer<Ignore, TValue>(groupId),
cts cts
)).Consumer as IConsumer<Null, TValue>; )).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);

View File

@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class; Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class; Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Ignore, TValue>>? deliveryHandler = null) where TValue : class;
} }
} }

View File

@ -114,8 +114,8 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{ {
var producer = GetProducer<Null, TValue>(); var producer = GetProducer<Ignore, TValue>();
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value }); await producer.ProduceAsync(topic, new Message<Ignore, TValue> { Value = value });
} }
/// <summary> /// <summary>
@ -160,13 +160,13 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <param name="partition"></param> /// <param name="partition"></param>
/// <param name="deliveryHandler"></param> /// <param name="deliveryHandler"></param>
/// <returns></returns> /// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Ignore, TValue>>? deliveryHandler = null) where TValue : class
{ {
var message = new Message<Null, TValue> var message = new Message<Ignore, TValue>
{ {
Value = value Value = value
}; };
var producer = GetProducer<Null, TValue>(); var producer = GetProducer<Ignore, TValue>();
if (partition.HasValue) if (partition.HasValue)
{ {
var topicPartition = new TopicPartition(topic, partition.Value); var topicPartition = new TopicPartition(topic, partition.Value);

View File

@ -12,12 +12,14 @@ using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit; using MassTransit;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Common.Helpers;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{ {
public abstract class BaseProtocolPlugin : IProtocolPlugin public abstract class BaseProtocolPlugin : IProtocolPlugin
{ {
private readonly ICapPublisher _producerBus; private readonly IProducerService _producerService;
private readonly ILogger<BaseProtocolPlugin> _logger; private readonly ILogger<BaseProtocolPlugin> _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository; private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
@ -37,7 +39,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>(); _logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>(); _protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_producerBus = serviceProvider.GetRequiredService<ICapPublisher>(); _producerService = serviceProvider.GetRequiredService<IProducerService>();
} }
public abstract ProtocolInfo Info { get; } public abstract ProtocolInfo Info { get; }
@ -87,7 +89,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize());
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
} }
@ -126,7 +128,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
Fn = 1 Fn = 1
}; };
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize());
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
} }

View File

@ -7,7 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" /> <PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" /> <PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="TouchSocket" Version="2.1.9" /> <PackageReference Include="TouchSocket" Version="2.1.9" />
@ -17,6 +17,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>