diff --git a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
index de972d1..6e4fed5 100644
--- a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
+++ b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
@@ -26,6 +26,7 @@
+
diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs
index f9bc706..658ff29 100644
--- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs
@@ -1,16 +1,17 @@
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
+using JiShe.CollectBus.Kafka;
using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers
{
public interface ISubscriberAppService : IApplicationService
{
- Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
- Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
- Task ReceivedEvent(MessageReceived receivedMessage);
- Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
- Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
+ Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
+ Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
+ Task ReceivedEvent(MessageReceived receivedMessage);
+ Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
+ Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
}
}
diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs
index abba774..9a37167 100644
--- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs
@@ -1,6 +1,7 @@
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
+using JiShe.CollectBus.Kafka;
using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp.Application.Services;
@@ -19,19 +20,19 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集电表数据下行消息消费订阅
///
///
- Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
+ Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
///
/// 5分钟采集电表数据下行消息消费订阅
///
///
- Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
+ Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
///
/// 15分钟采集电表数据下行消息消费订阅
///
///
- Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
+ Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
#region 水表消息采集
@@ -39,7 +40,7 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集水表数据下行消息消费订阅
///
///
- Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
+ Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
}
diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
index a2cc613..24db5a5 100644
--- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
+++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
@@ -23,7 +23,6 @@
-
diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
index ff4de0c..29427fc 100644
--- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
@@ -7,10 +7,12 @@ using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
+using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using MassTransit;
using Microsoft.Extensions.Logging;
@@ -26,7 +28,7 @@ namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
- private readonly ICapPublisher _producerBus;
+ private readonly IProducerService _producerService;
private readonly ILogger _logger;
private readonly IRepository _deviceRepository;
private readonly IDistributedCache _ammeterInfoCache;
@@ -34,16 +36,16 @@ namespace JiShe.CollectBus.Plugins
///
///
///
- ///
+ ///
///
///
///
- public TcpMonitor(ICapPublisher producerBus,
+ public TcpMonitor(IProducerService producerService,
ILogger logger,
IRepository deviceRepository,
IDistributedCache ammeterInfoCache)
{
- _producerBus = producerBus;
+ _producerService = producerService;
_logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache;
@@ -170,7 +172,7 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
- await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize());
//await _producerBus.Publish( messageReceivedLoginEvent);
}
@@ -217,7 +219,7 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
- await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize());
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
@@ -245,7 +247,7 @@ namespace JiShe.CollectBus.Plugins
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
- await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
ClientId = client.Id,
ClientIp = client.IP,
@@ -253,7 +255,7 @@ namespace JiShe.CollectBus.Plugins
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
- });
+ }.Serialize());
}
}
}
diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index 8f5449a..78b472d 100644
--- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -215,19 +215,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
return aa == null;
}
- [KafkaSubscribe(["test-topic1"])]
+ [KafkaSubscribe(["test-topic"])]
- public async Task KafkaSubscribeAsync() // TestSubscribe obj
+ public async Task KafkaSubscribeAsync(object obj)
{
- var obj=string.Empty;
_logger.LogWarning($"收到订阅消息: {obj}");
return SubscribeAck.Success();
}
}
-public class TestSubscribe
-{
- public string Topic { get; set; }
- public int Val { get; set; }
-}
-
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 287158f..f2a08b1 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -29,7 +29,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger _logger;
- private readonly ICapPublisher _producerBus;
private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService;
@@ -37,12 +36,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public BasicScheduledMeterReadingService(
ILogger logger,
- ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService,
IIoTDBProvider dbProvider)
{
- _producerBus = producerBus;
_logger = logger;
_dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository;
@@ -381,7 +378,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize());
//_= _producerBus.Publish(tempMsg);
@@ -445,7 +442,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg);
@@ -510,7 +507,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(),
};
- _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg);
@@ -805,7 +802,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
- await _producerService.ProduceAsync(topicName, partition, taskRecord);
+ await _producerService.ProduceAsync(topicName, partition, taskRecord.Serialize());
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
@@ -846,7 +843,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(),
};
- _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg);
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 91f7d4a..8593dfd 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -36,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(ILogger logger,
- ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
+ IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, meterReadingRecordRepository, producerService,dbProvider)
{
serverTagName = configuration.GetValue(CommonConst.ServerTagName)!;
}
diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
index d5e9caf..9bf297f 100644
--- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
@@ -6,6 +6,8 @@ using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
+using JiShe.CollectBus.Kafka;
+using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
@@ -20,7 +22,7 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
- public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe
+ public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe
{
private readonly ILogger _logger;
private readonly ITcpService _tcpService;
@@ -63,9 +65,10 @@ namespace JiShe.CollectBus.Subscribers
_dbProvider = dbProvider;
}
- [CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
- public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
+ [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
+ public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
{
+ bool isAck = false;
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@@ -76,6 +79,7 @@ namespace JiShe.CollectBus.Subscribers
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
+ isAck = true;
break;
case IssuedEventType.Data:
break;
@@ -90,11 +94,13 @@ namespace JiShe.CollectBus.Subscribers
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
+ return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
}
- [CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
- public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
+ [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
+ public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
{
+ bool isAck = false;
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@@ -103,6 +109,7 @@ namespace JiShe.CollectBus.Subscribers
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
+ isAck = true;
break;
case IssuedEventType.Data:
break;
@@ -117,10 +124,11 @@ namespace JiShe.CollectBus.Subscribers
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
+ return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
}
- [CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
- public async Task ReceivedEvent(MessageReceived receivedMessage)
+ [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
+ public async Task ReceivedEvent(MessageReceived receivedMessage)
{
var currentTime = Clock.Now;
@@ -137,13 +145,13 @@ namespace JiShe.CollectBus.Subscribers
if(fN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
- return;
+ return SubscribeAck.Success();
}
var tb3761FN = fN.FnList.FirstOrDefault();
if (tb3761FN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
- return;
+ return SubscribeAck.Success();
}
//报文入库
@@ -169,11 +177,14 @@ namespace JiShe.CollectBus.Subscribers
//todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
+
+
}
+ return SubscribeAck.Success();
}
- [CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
- public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
+ [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
+ public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
if (protocolPlugin == null)
@@ -185,10 +196,11 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
}
+ return SubscribeAck.Success();
}
- [CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
- public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
+ [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
+ public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
if (protocolPlugin == null)
@@ -200,6 +212,7 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.LoginAsync(receivedLoginMessage);
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
}
+ return SubscribeAck.Success();
}
}
}
diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
index 3caba20..f3c485c 100644
--- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
@@ -8,6 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
+using JiShe.CollectBus.Kafka;
+using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord;
@@ -24,7 +26,7 @@ namespace JiShe.CollectBus.Subscribers
/// 定时抄读任务消息消费订阅
///
[Route($"/worker/app/subscriber")]
- public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe
+ public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
{
private readonly ILogger _logger;
private readonly ITcpService _tcpService;
@@ -63,8 +65,8 @@ namespace JiShe.CollectBus.Subscribers
///
[HttpPost]
[Route("ammeter/oneminute/issued-event")]
- [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
- public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
+ [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
+ public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
@@ -81,6 +83,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
+ return SubscribeAck.Success();
}
///
@@ -90,8 +93,8 @@ namespace JiShe.CollectBus.Subscribers
///
[HttpPost]
[Route("ammeter/fiveminute/issued-event")]
- [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
- public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
+ [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
+ public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
@@ -108,6 +111,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
+ return SubscribeAck.Success();
}
///
@@ -117,8 +121,8 @@ namespace JiShe.CollectBus.Subscribers
///
[HttpPost]
[Route("ammeter/fifteenminute/issued-event")]
- [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
- public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
+ [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
+ public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
try
@@ -137,6 +141,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
+ return SubscribeAck.Success();
}
catch (Exception ex)
{
@@ -155,8 +160,8 @@ namespace JiShe.CollectBus.Subscribers
///
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
- [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
- public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
+ [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
+ public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
@@ -172,6 +177,7 @@ namespace JiShe.CollectBus.Subscribers
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
+ return SubscribeAck.Success();
}
#endregion
}
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
index aabc2ba..377453e 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -43,7 +43,7 @@ namespace JiShe.CollectBus.Host
ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration);
ConfigureHangfire(context);
- ConfigureCap(context, configuration);
+ //ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration);
//ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context);
diff --git a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml
index b438e18..aaadf3f 100644
--- a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml
+++ b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml
@@ -16,7 +16,6 @@
后端服务
-
diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
index 08b8cb1..f59385f 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
@@ -146,14 +146,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class
{
- var consumerKey = typeof((Null, TValue));
+ var consumerKey = typeof((Ignore, TValue));
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
(
- CreateConsumer(groupId),
+ CreateConsumer(groupId),
cts
- )).Consumer as IConsumer;
+ )).Consumer as IConsumer;
consumer!.Subscribe(topics);
diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs
index becea90..b00f5cf 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs
@@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
Task ProduceAsync(string topic, TKey key, TValue value, int? partition, Action>? deliveryHandler = null) where TKey : notnull where TValue : class;
- Task ProduceAsync(string topic, TValue value, int? partition = null, Action>? deliveryHandler = null) where TValue : class;
+ Task ProduceAsync(string topic, TValue value, int? partition = null, Action>? deliveryHandler = null) where TValue : class;
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
index ca45f8c..c322294 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
@@ -114,8 +114,8 @@ namespace JiShe.CollectBus.Kafka.Producer
///
public async Task ProduceAsync(string topic, TValue value) where TValue : class
{
- var producer = GetProducer();
- await producer.ProduceAsync(topic, new Message { Value = value });
+ var producer = GetProducer();
+ await producer.ProduceAsync(topic, new Message { Value = value });
}
///
@@ -160,13 +160,13 @@ namespace JiShe.CollectBus.Kafka.Producer
///
///
///
- public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class
+ public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class
{
- var message = new Message
+ var message = new Message
{
Value = value
};
- var producer = GetProducer();
+ var producer = GetProducer();
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index 762a2ca..23d0917 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -12,12 +12,14 @@ using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit;
+using JiShe.CollectBus.Kafka.Producer;
+using JiShe.CollectBus.Common.Helpers;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
public abstract class BaseProtocolPlugin : IProtocolPlugin
{
- private readonly ICapPublisher _producerBus;
+ private readonly IProducerService _producerService;
private readonly ILogger _logger;
private readonly IRepository _protocolInfoRepository;
@@ -37,7 +39,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
_logger = serviceProvider.GetRequiredService>();
_protocolInfoRepository = serviceProvider.GetRequiredService>();
- _producerBus = serviceProvider.GetRequiredService();
+ _producerService = serviceProvider.GetRequiredService();
}
public abstract ProtocolInfo Info { get; }
@@ -87,7 +89,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
- await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize());
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
}
@@ -126,7 +128,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
- await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
+ await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize());
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
}
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
index aa5bd73..248ee30 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
+++ b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
@@ -7,7 +7,7 @@
-
+
@@ -17,6 +17,7 @@
+