优化kafka消费者实例复用
优化登录心跳逻辑
This commit is contained in:
parent
4543299dde
commit
c5364f4a95
@ -7,16 +7,20 @@ using Microsoft.Extensions.Logging;
|
|||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
using System.Text.RegularExpressions;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
namespace JiShe.CollectBus.Kafka.Consumer
|
||||||
{
|
{
|
||||||
public class ConsumerService : IConsumerService, IDisposable
|
public class ConsumerService : IConsumerService, IDisposable
|
||||||
{
|
{
|
||||||
private readonly ILogger<ConsumerService> _logger;
|
private readonly ILogger<ConsumerService> _logger;
|
||||||
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
|
/// <summary>
|
||||||
|
/// 消费者存储
|
||||||
|
/// Key 格式:{groupId}_{topic}_{TKey}_{TValue}
|
||||||
|
/// </summary>
|
||||||
|
private readonly ConcurrentDictionary<string, (object Consumer, CancellationTokenSource CTS)>
|
||||||
_consumerStore = new();
|
_consumerStore = new();
|
||||||
private readonly KafkaOptionConfig _kafkaOptionConfig;
|
private readonly KafkaOptionConfig _kafkaOptionConfig;
|
||||||
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// ConsumerService
|
/// ConsumerService
|
||||||
@ -108,15 +112,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
//(
|
(
|
||||||
// CreateConsumer<TKey, TValue>(groupId),
|
CreateConsumer<TKey, TValue>(groupId),
|
||||||
// cts
|
cts
|
||||||
//)).Consumer as IConsumer<TKey, TValue>;
|
)).Consumer as IConsumer<TKey, TValue>;
|
||||||
var consumer = CreateConsumer<TKey, TValue>(groupId);
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
await Task.Run(async () =>
|
await Task.Run(async () =>
|
||||||
@ -176,19 +179,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
//{
|
(
|
||||||
// string ssss = "";
|
CreateConsumer<Ignore, TValue>(groupId),
|
||||||
//}
|
cts
|
||||||
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
)).Consumer as IConsumer<Ignore, TValue>;
|
||||||
//(
|
|
||||||
// CreateConsumer<string, TValue>(groupId),
|
|
||||||
// cts
|
|
||||||
//)).Consumer as IConsumer<string, TValue>;
|
|
||||||
|
|
||||||
var consumer = CreateConsumer<Ignore, TValue>(groupId);
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
_ = Task.Run(async () =>
|
||||||
@ -274,15 +272,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <param name="batchTimeout">批次超时时间</param>
|
/// <param name="batchTimeout">批次超时时间</param>
|
||||||
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
|
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
// (
|
(
|
||||||
// CreateConsumer<TKey, TValue>(groupId),
|
CreateConsumer<TKey, TValue>(groupId),
|
||||||
// cts
|
cts
|
||||||
// )).Consumer as IConsumer<TKey, TValue>;
|
)).Consumer as IConsumer<TKey, TValue>;
|
||||||
var consumer = CreateConsumer<string, TValue>(groupId);
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
||||||
@ -366,6 +363,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
{
|
{
|
||||||
// 任务取消,正常退出
|
// 任务取消,正常退出
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@ -407,16 +405,15 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <param name="consumeTimeout">消费等待时间</param>
|
/// <param name="consumeTimeout">消费等待时间</param>
|
||||||
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<string, TValue>);
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
// (
|
(
|
||||||
// CreateConsumer<string, TValue>(groupId),
|
CreateConsumer<Ignore, TValue>(groupId),
|
||||||
// cts
|
cts
|
||||||
// )).Consumer as IConsumer<string, TValue>;
|
)).Consumer as IConsumer<Ignore, TValue>;
|
||||||
|
|
||||||
var consumer= CreateConsumer<string, TValue> (groupId);
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
||||||
@ -518,9 +515,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TKey"></typeparam>
|
/// <typeparam name="TKey"></typeparam>
|
||||||
/// <typeparam name="TValue"></typeparam>
|
/// <typeparam name="TValue"></typeparam>
|
||||||
public void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class
|
public void Unsubscribe<TKey, TValue>(string[] topics, string groupId) where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof((TKey, TValue));
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
||||||
if (_consumerStore.TryRemove(consumerKey, out var entry))
|
if (_consumerStore.TryRemove(consumerKey, out var entry))
|
||||||
{
|
{
|
||||||
entry.CTS.Cancel();
|
entry.CTS.Cancel();
|
||||||
|
|||||||
@ -46,5 +46,5 @@ public interface IConsumerService
|
|||||||
string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null)
|
string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null)
|
||||||
where TValue : class;
|
where TValue : class;
|
||||||
|
|
||||||
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
|
void Unsubscribe<TKey, TValue>(string[] topics, string groupId) where TKey : notnull where TValue : class;
|
||||||
}
|
}
|
||||||
@ -9,11 +9,9 @@ using System.Threading.Tasks;
|
|||||||
namespace JiShe.CollectBus.Protocol.Contracts
|
namespace JiShe.CollectBus.Protocol.Contracts
|
||||||
{
|
{
|
||||||
|
|
||||||
public class AnalysisStrategyContext
|
public class AnalysisStrategyContext(IServiceProvider provider)
|
||||||
{
|
{
|
||||||
private readonly IServiceProvider _provider;
|
private readonly IServiceProvider _provider = provider;
|
||||||
|
|
||||||
public AnalysisStrategyContext(IServiceProvider provider) => _provider = provider;
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 执行策略
|
/// 执行策略
|
||||||
|
|||||||
@ -8,6 +8,6 @@ namespace JiShe.CollectBus.Protocol.Dto
|
|||||||
{
|
{
|
||||||
public class AFN0_F1_AnalysisDto: UnitDataDto
|
public class AFN0_F1_AnalysisDto: UnitDataDto
|
||||||
{
|
{
|
||||||
|
public bool Verify { get; set; } = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,6 +12,9 @@ using System.Threading.Tasks;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H
|
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 全部确认:对收到报文中的全部数据单元标识进行确认
|
||||||
|
/// </summary>
|
||||||
public class AFN0_F1_Analysis: IAnalysisStrategy<TB3761, AFN0_F1_AnalysisDto>
|
public class AFN0_F1_Analysis: IAnalysisStrategy<TB3761, AFN0_F1_AnalysisDto>
|
||||||
{
|
{
|
||||||
private readonly ILogger<AFN0_F1_Analysis> _logger;
|
private readonly ILogger<AFN0_F1_Analysis> _logger;
|
||||||
|
|||||||
@ -28,16 +28,18 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
private readonly IProducerService _producerService;
|
private readonly IProducerService _producerService;
|
||||||
|
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
|
private readonly ITcpService _tcpService;
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class.
|
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="serviceProvider">The service provider.</param>
|
/// <param name="serviceProvider">The service provider.</param>
|
||||||
public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger<StandardProtocolPlugin> logger) : base(serviceProvider, logger)
|
public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger<StandardProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger)
|
||||||
{
|
{
|
||||||
_logger= logger;
|
_logger = logger;
|
||||||
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
|
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
|
||||||
_producerService = serviceProvider.GetRequiredService<IProducerService>();
|
_producerService = serviceProvider.GetRequiredService<IProducerService>();
|
||||||
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
|
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
|
||||||
|
_tcpService = tcpService;
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
|
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
|
||||||
@ -146,9 +148,21 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
Fn = 1
|
Fn = 1
|
||||||
};
|
};
|
||||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||||
|
var issuedEventMessage = new IssuedEventMessage
|
||||||
|
{
|
||||||
|
ClientId = messageReceivedLoginEvent.ClientId,
|
||||||
|
DeviceNo = messageReceivedLoginEvent.DeviceNo,
|
||||||
|
Message = bytes, Type = IssuedEventType.Login,
|
||||||
|
MessageId = messageReceivedLoginEvent.MessageId
|
||||||
|
};
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||||
|
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
|
||||||
|
{
|
||||||
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
|
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
|
||||||
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
|
||||||
|
}
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedLoginEvent.ClientId, DeviceNo = messageReceivedLoginEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedLoginEvent.MessageId });
|
|
||||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -227,7 +241,20 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedHeartbeatEvent.ClientId, DeviceNo = messageReceivedHeartbeatEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceivedHeartbeatEvent.MessageId });
|
IssuedEventMessage issuedEventMessage = new IssuedEventMessage
|
||||||
|
{
|
||||||
|
ClientId = messageReceivedHeartbeatEvent.ClientId,
|
||||||
|
DeviceNo = messageReceivedHeartbeatEvent.DeviceNo,
|
||||||
|
Message = bytes,
|
||||||
|
Type = IssuedEventType.Heartbeat,
|
||||||
|
MessageId = messageReceivedHeartbeatEvent.MessageId
|
||||||
|
};
|
||||||
|
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
|
||||||
|
{
|
||||||
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
|
||||||
|
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
|
||||||
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
|
||||||
|
}
|
||||||
|
|
||||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||||
|
|
||||||
|
|||||||
@ -168,7 +168,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
{
|
{
|
||||||
SystemName = "energy",
|
SystemName = "energy",
|
||||||
DeviceId = "402440506",
|
DeviceId = "402440506",
|
||||||
DeviceType = "Ammeter",
|
DeviceType = "1",
|
||||||
ProjectId = "10059",
|
ProjectId = "10059",
|
||||||
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
SingleMeasuring = new Tuple<string, string>(measuring, value)
|
SingleMeasuring = new Tuple<string, string>(measuring, value)
|
||||||
|
|||||||
@ -75,14 +75,11 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
isAck=false;
|
isAck=false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
|
|
||||||
|
|
||||||
loginEntity.AckTime = Clock.Now;
|
loginEntity.AckTime = Clock.Now;
|
||||||
loginEntity.IsAck = true;
|
loginEntity.IsAck = true;
|
||||||
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||||
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
|
isAck = true;
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
|
||||||
isAck = true;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
@ -102,19 +99,10 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
isAck = false;
|
isAck = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
|
|
||||||
|
|
||||||
heartbeatEntity.AckTime = Clock.Now;
|
heartbeatEntity.AckTime = Clock.Now;
|
||||||
heartbeatEntity.IsAck = true;
|
heartbeatEntity.IsAck = true;
|
||||||
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||||
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
|
}
|
||||||
//if (device != null)
|
|
||||||
//{
|
|
||||||
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
|
|
||||||
//}
|
|
||||||
if(_tcpService.ClientExists(issuedEventMessage.ClientId))
|
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
|
||||||
}
|
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
||||||
}
|
}
|
||||||
@ -178,19 +166,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
||||||
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
|
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
|
||||||
{
|
{
|
||||||
//foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
|
|
||||||
//{
|
|
||||||
// var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
// if (protocolPlugin == null)
|
|
||||||
// {
|
|
||||||
// _logger.LogError("协议不存在!");
|
|
||||||
// }
|
|
||||||
// else
|
|
||||||
// {
|
|
||||||
// //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
|
|
||||||
// await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
|
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
|
||||||
|
|
||||||
return SubscribeAck.Success();
|
return SubscribeAck.Success();
|
||||||
@ -200,20 +175,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
||||||
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
|
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
|
||||||
{
|
{
|
||||||
//foreach (var receivedLoginMessage in receivedLoginMessages)
|
|
||||||
//{
|
|
||||||
//var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
//if (protocolPlugin == null)
|
|
||||||
//{
|
|
||||||
// _logger.LogError("协议不存在!");
|
|
||||||
//}
|
|
||||||
//else
|
|
||||||
//{
|
|
||||||
// //await protocolPlugin.LoginAsync(receivedLoginMessage);
|
|
||||||
// await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//}
|
|
||||||
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
|
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
|
||||||
return SubscribeAck.Success();
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,6 @@
|
|||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
||||||
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
||||||
<title>后端服务</title>
|
<title>后端服务</title>
|
||||||
|
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user