梳理Kafka主题,以及设备Hash分组方法封装。
This commit is contained in:
parent
39ae8a7f38
commit
b42388e7df
@ -7,7 +7,8 @@ namespace JiShe.CollectBus.Subscribers
|
||||
{
|
||||
public interface ISubscriberAppService : IApplicationService
|
||||
{
|
||||
Task IssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||
Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||
Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||
Task ReceivedEvent(MessageReceived receivedMessage);
|
||||
Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
|
||||
Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
|
||||
|
||||
@ -71,7 +71,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
return result;
|
||||
|
||||
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -108,7 +108,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
|
||||
foreach (var bytes in bytesList)
|
||||
{
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -149,7 +149,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
|
||||
}).ToList();
|
||||
var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(address, meterParameters);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -178,7 +178,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
{
|
||||
var dataUnit = Build645SendData.BuildReadMeterAddressSendDataUnit(detail.MeterAddress);
|
||||
var bytes =Build3761SendData.BuildTransparentForwardingSendCmd(address, detail.Port, detail.BaudRate.ToString(), dataUnit, StopBit.Stop1, Parity.None);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -261,7 +261,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
|
||||
if (bytes != null)
|
||||
{
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -320,7 +320,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
|
||||
var bytes = Build3761SendData.BuildCommunicationParametersSetSendCmd(address, masterIP, materPort,
|
||||
backupIP, backupPort, input.Data.APN);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -347,7 +347,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
var address = $"{input.AreaCode}{input.Address}";
|
||||
|
||||
var bytes = Build3761SendData.BuildTerminalCalendarClockSendCmd(address);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -375,7 +375,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
bool isManual = !input.AreaCode.Equals("5110");//低功耗集中器不是长连接,在连接的那一刻再发送
|
||||
|
||||
var bytes = Build3761SendData.BuildConrCheckTimeSendCmd(address,DateTime.Now, isManual);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -402,7 +402,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
var address = $"{input.AreaCode}{input.Address}";
|
||||
|
||||
var bytes = Build3761SendData.BuildConrRebootSendCmd(address);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -430,7 +430,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
var address = $"{input.AreaCode}{input.Address}";
|
||||
var pnList = input.Data.Split(',').Select(it => int.Parse(it)).ToList();
|
||||
var bytes = Build3761SendData.BuildAmmeterParameterReadingSendCmd(address, pnList);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -479,7 +479,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
|
||||
foreach (var bytes in bytesList)
|
||||
{
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -548,7 +548,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
|
||||
foreach (var bytes in bytesList)
|
||||
{
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -577,7 +577,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
var address = $"{code.AreaCode}{code.Address}";
|
||||
var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(address,input.Detail.Pn, input.Detail.Unit,input.Detail.Cycle,input.Detail.BaseTime,
|
||||
input.Detail.CurveRatio,input.Detail.Details.Select(it => new PnFn(it.Pn,it.Fn)).ToList());
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -605,7 +605,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
{
|
||||
var address = $"{code.AreaCode}{code.Address}";
|
||||
var bytes = Build3761SendData.BuildAmmeterAutoUpSwitchSetSendCmd(address, input.Detail.Pn,input.Detail.IsOpen);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -631,7 +631,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
var result = new BaseResultDto();
|
||||
var address = $"{input.AreaCode}{input.Address}";
|
||||
var bytes = Build3761SendData.BuildAmmeterReadAutoUpSwitchSendCmd(address, input.Detail.Pn);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -658,7 +658,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
{
|
||||
var address = $"{data.AreaCode}{data.Address}";
|
||||
var bytes = Build3761SendData.BuildTerminalVersionInfoReadingSendCmd(address);
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
@ -713,7 +713,7 @@ namespace JiShe.CollectBus.EnergySystem
|
||||
|
||||
foreach (var bytes in bytesList)
|
||||
{
|
||||
await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage
|
||||
{
|
||||
//ClientId = messageReceived.ClientId,
|
||||
DeviceNo = address,
|
||||
|
||||
@ -19,6 +19,7 @@ using Volo.Abp.Caching;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
using Volo.Abp.Domain.Entities;
|
||||
using Volo.Abp.Domain.Repositories;
|
||||
using static FreeSql.Internal.GlobalFilter;
|
||||
|
||||
namespace JiShe.CollectBus.Plugins
|
||||
{
|
||||
@ -78,7 +79,7 @@ namespace JiShe.CollectBus.Plugins
|
||||
}
|
||||
else
|
||||
{
|
||||
await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1);
|
||||
await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1,aFn.ToString()!.PadLeft(2,'0'));
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -160,7 +161,7 @@ namespace JiShe.CollectBus.Plugins
|
||||
DeviceNo = deviceNo,
|
||||
MessageId = NewId.NextGuid().ToString()
|
||||
};
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent);
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
||||
|
||||
//await _producerBus.Publish( messageReceivedLoginEvent);
|
||||
}
|
||||
@ -199,11 +200,19 @@ namespace JiShe.CollectBus.Plugins
|
||||
DeviceNo = deviceNo,
|
||||
MessageId = NewId.NextGuid().ToString()
|
||||
};
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent);
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
||||
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
|
||||
}
|
||||
|
||||
private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
|
||||
/// <summary>
|
||||
/// 正常帧处理,将不同的AFN进行分发
|
||||
/// </summary>
|
||||
/// <param name="client"></param>
|
||||
/// <param name="messageHexString"></param>
|
||||
/// <param name="deviceNo"></param>
|
||||
/// <param name="aFn"></param>
|
||||
/// <returns></returns>
|
||||
private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo,string aFn)
|
||||
{
|
||||
//await _producerBus.Publish(new MessageReceived
|
||||
//{
|
||||
@ -215,7 +224,10 @@ namespace JiShe.CollectBus.Plugins
|
||||
// MessageId = NewId.NextGuid().ToString()
|
||||
//});
|
||||
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
||||
|
||||
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
|
||||
|
||||
await _producerBus.PublishAsync(topicName, new MessageReceived
|
||||
{
|
||||
ClientId = client.Id,
|
||||
ClientIp = client.IP,
|
||||
|
||||
@ -60,8 +60,37 @@ namespace JiShe.CollectBus.Subscribers
|
||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
||||
}
|
||||
|
||||
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
|
||||
public async Task IssuedEvent(IssuedEventMessage issuedEventMessage)
|
||||
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
||||
public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
|
||||
{
|
||||
switch (issuedEventMessage.Type)
|
||||
{
|
||||
case IssuedEventType.Heartbeat:
|
||||
break;
|
||||
case IssuedEventType.Login:
|
||||
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
|
||||
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||
loginEntity.AckTime = Clock.Now;
|
||||
loginEntity.IsAck = true;
|
||||
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||
break;
|
||||
case IssuedEventType.Data:
|
||||
break;
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException();
|
||||
}
|
||||
|
||||
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
|
||||
//if (device != null)
|
||||
//{
|
||||
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
|
||||
//}
|
||||
|
||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||
}
|
||||
|
||||
[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
||||
public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
|
||||
{
|
||||
switch (issuedEventMessage.Type)
|
||||
{
|
||||
@ -72,13 +101,6 @@ namespace JiShe.CollectBus.Subscribers
|
||||
heartbeatEntity.IsAck = true;
|
||||
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||
break;
|
||||
case IssuedEventType.Login:
|
||||
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
|
||||
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||
loginEntity.AckTime = Clock.Now;
|
||||
loginEntity.IsAck = true;
|
||||
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||
break;
|
||||
case IssuedEventType.Data:
|
||||
break;
|
||||
default:
|
||||
@ -130,15 +152,14 @@ namespace JiShe.CollectBus.Subscribers
|
||||
Pn = 0,
|
||||
FocusAddress = "",
|
||||
MeterAddress = "",
|
||||
//DataResult = tb3761FN.Text,
|
||||
});
|
||||
|
||||
//todo 将解析结果插入IoTDB,时标从
|
||||
|
||||
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
[CapSubscribe(ProtocolConst.SubscriberReceivedHeartbeatEventName)]
|
||||
[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
||||
public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
|
||||
{
|
||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||
@ -153,7 +174,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
}
|
||||
}
|
||||
|
||||
[CapSubscribe(ProtocolConst.SubscriberReceivedLoginEventName)]
|
||||
[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
||||
public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
|
||||
{
|
||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||
|
||||
@ -49,6 +49,21 @@ namespace JiShe.CollectBus.Common.Extensions
|
||||
);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 将枚举转换为<int, string>字典
|
||||
/// </summary>
|
||||
/// <typeparam name="TEnum"></typeparam>
|
||||
/// <returns></returns>
|
||||
public static Dictionary<string, int> ToNameValueDictionary<TEnum>() where TEnum : Enum
|
||||
{
|
||||
return Enum.GetValues(typeof(TEnum))
|
||||
.Cast<TEnum>()
|
||||
.ToDictionary(
|
||||
e => e.ToString(),
|
||||
e => Convert.ToInt32(e)
|
||||
);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 将枚举转换为<TEnum, string>字典
|
||||
/// </summary>
|
||||
|
||||
@ -760,5 +760,39 @@ namespace JiShe.CollectBus.Common.Helpers
|
||||
|
||||
return fontValue;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取设备Id哈希值
|
||||
/// </summary>
|
||||
/// <param name="deviceId"></param>
|
||||
/// <param name="TotalShards"></param>
|
||||
/// <returns></returns>
|
||||
public static int GetDeviceHashCode(string deviceId, int TotalShards = 100)
|
||||
{
|
||||
// 计算哈希分组ID
|
||||
return Math.Abs(deviceId.GetHashCode() % TotalShards);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取设备Id哈希分组
|
||||
/// </summary>
|
||||
/// <param name="deviceList"></param>
|
||||
/// <returns></returns>
|
||||
public static Dictionary<string, List<string>> GetDeviceHashGroup(List<string> deviceList)
|
||||
{
|
||||
Dictionary<string, List<string>> keyValuePairs = new Dictionary<string, List<string>>();
|
||||
foreach (var deviceId in deviceList)
|
||||
{
|
||||
var hashCode = GetDeviceHashCode(deviceId);
|
||||
|
||||
if (!keyValuePairs.ContainsKey(hashCode.ToString()))
|
||||
{
|
||||
keyValuePairs.Add(hashCode.ToString(), new List<string>());
|
||||
}
|
||||
|
||||
keyValuePairs[hashCode.ToString()].Add(deviceId);
|
||||
}
|
||||
return keyValuePairs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -257,6 +257,9 @@ namespace JiShe.CollectBus.Host
|
||||
/// <param name="configuration">The configuration.</param>
|
||||
public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration)
|
||||
{
|
||||
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
||||
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
|
||||
|
||||
context.Services.AddCap(x =>
|
||||
{
|
||||
x.DefaultGroupName = ProtocolConst.SubscriberGroup;
|
||||
@ -283,8 +286,6 @@ namespace JiShe.CollectBus.Host
|
||||
/// <summary>
|
||||
/// Configures the mass transit.
|
||||
/// </summary>
|
||||
/// <param name="context">The context.</param>
|
||||
/// <param name="configuration">The configuration.</param>
|
||||
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
|
||||
{
|
||||
|
||||
@ -296,7 +297,9 @@ namespace JiShe.CollectBus.Host
|
||||
|
||||
try
|
||||
{
|
||||
var topics = ProtocolConstExtensions.GetAllTopicNames();
|
||||
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
||||
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
|
||||
|
||||
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
|
||||
foreach (var item in topics)
|
||||
{
|
||||
@ -348,20 +351,20 @@ namespace JiShe.CollectBus.Host
|
||||
});
|
||||
rider.AddConsumer<ScheduledMeterReadingConsumer>();
|
||||
|
||||
rider.AddProducer<string, MessageReceivedLogin>(ProtocolConst.SubscriberReceivedLoginEventName);
|
||||
rider.AddProducer<string, ReceivedHeartbeatConsumer>(ProtocolConst.SubscriberReceivedHeartbeatEventName);
|
||||
rider.AddProducer<string, MessageReceivedLogin>(ProtocolConst.SubscriberLoginReceivedEventName);
|
||||
rider.AddProducer<string, ReceivedHeartbeatConsumer>(ProtocolConst.SubscriberHeartbeatReceivedEventName);
|
||||
|
||||
rider.UsingKafka((c, cfg) =>
|
||||
{
|
||||
cfg.Host(configuration.GetConnectionString("Kafka"));
|
||||
|
||||
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator =>
|
||||
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberHeartbeatReceivedEventName, consumerConfig, configurator =>
|
||||
{
|
||||
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
||||
configurator.ConfigureConsumer<ReceivedHeartbeatConsumer>(c);
|
||||
});
|
||||
|
||||
cfg.TopicEndpoint<MessageReceivedLogin>(ProtocolConst.SubscriberReceivedLoginEventName, consumerConfig, configurator =>
|
||||
cfg.TopicEndpoint<MessageReceivedLogin>(ProtocolConst.SubscriberLoginReceivedEventName, consumerConfig, configurator =>
|
||||
{
|
||||
configurator.ConfigureConsumer<ReceivedLoginConsumer>(c);
|
||||
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
||||
@ -373,7 +376,7 @@ namespace JiShe.CollectBus.Host
|
||||
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
||||
});
|
||||
|
||||
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberIssuedEventName, consumerConfig, configurator =>
|
||||
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
|
||||
{
|
||||
configurator.ConfigureConsumer<IssuedConsumer>(c);
|
||||
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
||||
|
||||
@ -120,7 +120,7 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
||||
public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
|
||||
{
|
||||
var collection = await GetShardedCollection(dateTime);
|
||||
await collection.findon
|
||||
//await collection.findon
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
|
||||
@ -87,7 +87,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
};
|
||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||
}
|
||||
|
||||
@ -126,7 +126,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
Fn = 1
|
||||
};
|
||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||
|
||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||
}
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
using System;
|
||||
using JiShe.CollectBus.Common.Enums;
|
||||
using JiShe.CollectBus.Common.Extensions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
@ -10,10 +12,10 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
||||
public class ProtocolConstExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// 自动获取 ProtocolConst 类中所有 Kafka 主题名称
|
||||
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
|
||||
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
|
||||
/// </summary>
|
||||
public static List<string> GetAllTopicNames()
|
||||
public static List<string> GetAllTopicNamesByIssued()
|
||||
{
|
||||
return typeof(ProtocolConst)
|
||||
.GetFields(BindingFlags.Public | BindingFlags.Static)
|
||||
@ -21,9 +23,36 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
||||
f.IsLiteral &&
|
||||
!f.IsInitOnly &&
|
||||
f.FieldType == typeof(string) &&
|
||||
f.Name.EndsWith("EventName")) // 通过命名规则过滤主题字段
|
||||
f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段
|
||||
.Select(f => (string)f.GetRawConstantValue()!)
|
||||
.ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
|
||||
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
|
||||
/// </summary>
|
||||
public static List<string> GetAllTopicNamesByReceived()
|
||||
{
|
||||
//固定的上报主题
|
||||
var topicList = typeof(ProtocolConst)
|
||||
.GetFields(BindingFlags.Public | BindingFlags.Static)
|
||||
.Where(f =>
|
||||
f.IsLiteral &&
|
||||
!f.IsInitOnly &&
|
||||
f.FieldType == typeof(string) &&
|
||||
f.Name.EndsWith("ReceivedEventName")) // 通过命名规则过滤主题字段
|
||||
.Select(f => (string)f.GetRawConstantValue()!)
|
||||
.ToList();
|
||||
|
||||
//动态上报主题,需根据协议的AFN功能码动态获取
|
||||
var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
|
||||
foreach (var item in afnList)
|
||||
{
|
||||
topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0')));
|
||||
}
|
||||
|
||||
return topicList;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -9,10 +9,28 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
||||
public class ProtocolConst
|
||||
{
|
||||
public const string SubscriberGroup = "jishe.collectbus";
|
||||
public const string SubscriberIssuedEventName = "issued.event";
|
||||
/// <summary>
|
||||
/// 心跳下行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event";
|
||||
/// <summary>
|
||||
/// 登录下行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberLoginIssuedEventName = "issued.login.event";
|
||||
|
||||
/// <summary>
|
||||
/// 上行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberReceivedEventName = "received.event";
|
||||
public const string SubscriberReceivedHeartbeatEventName = "received.heartbeat.event";
|
||||
public const string SubscriberReceivedLoginEventName = "received.login.event";
|
||||
|
||||
/// <summary>
|
||||
/// 心跳上行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event";
|
||||
/// <summary>
|
||||
/// 登录上行消息主题
|
||||
/// </summary>
|
||||
public const string SubscriberLoginReceivedEventName = "received.login.event";
|
||||
|
||||
#region 电表消息主题
|
||||
/// <summary>
|
||||
@ -85,7 +103,10 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
||||
public const string WatermeterSubscriberWorkerManualValveReadingIssuedEventName = "issued.manual.reading.watermeter.event";
|
||||
#endregion
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// AFN上行主题格式
|
||||
/// </summary>
|
||||
public const string AFNTopicNameFormat = "received.afn{0}.event";
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user