Compare commits
No commits in common. "5ca4cbad13f5c03884b2b1bb3b9d37ac1de0a1bf" and "ea6dc9f39f805638885f57ddb114dbd321887fe5" have entirely different histories.
5ca4cbad13
...
ea6dc9f39f
@ -65,7 +65,7 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
// 实现IKafkaSubscribe接口
|
// 实现IKafkaSubscribe接口
|
||||||
var subscribeTypes = assembly.GetTypes().Where(type =>
|
var subscribeTypes = assembly.GetTypes().Where(type =>
|
||||||
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
|
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
|
||||||
!type.IsAbstract && !type.IsInterface).ToList();
|
!type.IsAbstract && !type.IsInterface).ToList(); ;
|
||||||
if (subscribeTypes.Count == 0)
|
if (subscribeTypes.Count == 0)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
|||||||
@ -66,11 +66,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
{
|
{
|
||||||
TB3761 tB3761 = new TB3761
|
TB3761 tB3761 = new TB3761
|
||||||
{
|
{
|
||||||
BaseHexMessage = new BaseHexMessage
|
|
||||||
{
|
|
||||||
HexMessageString = messageReceived,
|
|
||||||
HexMessageList = hexStringList
|
|
||||||
},
|
|
||||||
C = Analysis_C(hexStringList),
|
C = Analysis_C(hexStringList),
|
||||||
A = Analysis_A(hexStringList),
|
A = Analysis_A(hexStringList),
|
||||||
AFN_FC = Analysis_AFN_FC(hexStringList),
|
AFN_FC = Analysis_AFN_FC(hexStringList),
|
||||||
|
|||||||
@ -1,25 +0,0 @@
|
|||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.Contracts
|
|
||||||
{
|
|
||||||
|
|
||||||
public class AnalysisStrategyContext<TInput, TResult>
|
|
||||||
{
|
|
||||||
private readonly IAnalysisStrategy<TInput, TResult> _analysisStrategy;
|
|
||||||
|
|
||||||
public AnalysisStrategyContext(IAnalysisStrategy<TInput, TResult> analysisStrategy)
|
|
||||||
{
|
|
||||||
_analysisStrategy = analysisStrategy;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task<TResult> ExecuteAnalysisStrategy(TInput input)
|
|
||||||
{
|
|
||||||
return _analysisStrategy.ExecuteAsync(input);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,15 +0,0 @@
|
|||||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
|
||||||
using JiShe.CollectBus.Protocol.Dto;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
|
|
||||||
{
|
|
||||||
public interface IAnalysisStrategy<TInput, TResult>
|
|
||||||
{
|
|
||||||
Task<TResult> ExecuteAsync(TInput input);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -12,11 +12,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Models
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class TB3761
|
public class TB3761
|
||||||
{
|
{
|
||||||
/// <summary>
|
|
||||||
/// 报文
|
|
||||||
/// </summary>
|
|
||||||
public BaseHexMessage? BaseHexMessage { get; set;}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 控制域C
|
/// 控制域C
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -1,13 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.Dto
|
|
||||||
{
|
|
||||||
public class AFN0_F1_AnalysisDto: UnitDataDto
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,42 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.Dto
|
|
||||||
{
|
|
||||||
public class UnitDataDto
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 集中器地址
|
|
||||||
/// </summary>
|
|
||||||
public string? Code { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// AFN功能码
|
|
||||||
/// </summary>
|
|
||||||
public int AFN { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 信息点
|
|
||||||
/// </summary>
|
|
||||||
public int Pn { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 信息类
|
|
||||||
/// </summary>
|
|
||||||
public int Fn { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据时标(最近数据时间点的时间),如:8:00 08:15 记录08:15
|
|
||||||
/// </summary>
|
|
||||||
public string? DataTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 密度(分)
|
|
||||||
/// </summary>
|
|
||||||
public int TimeDensity { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,45 +0,0 @@
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
|
||||||
using JiShe.CollectBus.Protocol.Dto;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Microsoft.IdentityModel.Logging;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H
|
|
||||||
{
|
|
||||||
public class AFN0_F1_Analysis: IAnalysisStrategy<TB3761, AFN0_F1_AnalysisDto>
|
|
||||||
{
|
|
||||||
private readonly ILogger<AFN0_F1_Analysis> _logger;
|
|
||||||
|
|
||||||
public AFN0_F1_Analysis(ILogger<AFN0_F1_Analysis> logger)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task<AFN0_F1_AnalysisDto> ExecuteAsync(TB3761 tB3761)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
ArgumentNullException.ThrowIfNull(nameof(tB3761));
|
|
||||||
AFN0_F1_AnalysisDto dto = new AFN0_F1_AnalysisDto
|
|
||||||
{
|
|
||||||
Code = tB3761.A?.Code,
|
|
||||||
AFN = tB3761.AFN_FC?.AFN ?? 0,
|
|
||||||
Fn = tB3761.DT?.Fn ?? 0,
|
|
||||||
Pn = tB3761.DA?.Pn ?? 0
|
|
||||||
};
|
|
||||||
return Task.FromResult(dto);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"00_1解析失败:{tB3761.A?.Code}-{tB3761.DT?.Fn ?? 0}-{tB3761?.BaseHexMessage?.HexMessageString},{ex.Message}");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,12 +1,7 @@
|
|||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
|
||||||
using JiShe.CollectBus.Protocol.AnalysisData;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Serilog.Core;
|
|
||||||
using System.Reflection;
|
|
||||||
using TouchSocket.Core;
|
using TouchSocket.Core;
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
@ -18,7 +13,6 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||||
{
|
{
|
||||||
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin));
|
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin));
|
||||||
RegisterProtocolAnalysis(context.Services);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
|
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
|
||||||
@ -26,45 +20,5 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
|
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
|
||||||
await standardProtocol.AddAsync();
|
await standardProtocol.AddAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void RegisterProtocolAnalysis(IServiceCollection services)
|
|
||||||
{
|
|
||||||
// 批量注册
|
|
||||||
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
|
|
||||||
if (string.IsNullOrWhiteSpace(assemblyPath))
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
var dllFiles = Directory.GetFiles(Path.Combine(assemblyPath, "Plugins") , "*.dll");
|
|
||||||
foreach (var file in dllFiles)
|
|
||||||
{
|
|
||||||
// 跳过已加载的程序集
|
|
||||||
var assemblyName = AssemblyName.GetAssemblyName(file);
|
|
||||||
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
|
|
||||||
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
|
|
||||||
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
|
|
||||||
// 实现IAnalysisStrategy接口
|
|
||||||
//var analysisStrategyTypes = assembly.GetTypes().Where(t =>!t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i =>
|
|
||||||
//i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>))).ToList();
|
|
||||||
var analysisStrategyTypes = assembly.GetTypes().Where(type =>
|
|
||||||
typeof(IAnalysisStrategy<,>).IsAssignableFrom(type) &&
|
|
||||||
!type.IsAbstract && !type.IsInterface).ToList();
|
|
||||||
if (analysisStrategyTypes.Count == 0)
|
|
||||||
continue;
|
|
||||||
foreach (var analysisStrategyType in analysisStrategyTypes)
|
|
||||||
{
|
|
||||||
// 取所有接口
|
|
||||||
//var interfaceTypes = analysisStrategyType.GetInterfaces()
|
|
||||||
// .Where(i => i.IsGenericType &&
|
|
||||||
// i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>));
|
|
||||||
//foreach (var interfaceType in interfaceTypes)
|
|
||||||
//{
|
|
||||||
// services.AddKeyedTransient(analysisStrategyType, nameof(analysisStrategyType));
|
|
||||||
//}
|
|
||||||
services.AddKeyedTransient(analysisStrategyType, nameof(analysisStrategyType));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -74,6 +74,7 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return (tB3761 as T)!;
|
return (tB3761 as T)!;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
@ -131,11 +130,11 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761)
|
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761)
|
||||||
{
|
{
|
||||||
//var _protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var _protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
//if (_protocolPlugin == null)
|
if (_protocolPlugin == null)
|
||||||
//{
|
{
|
||||||
// _logger.LogError("376.1协议插件不存在!");
|
_logger.LogError("376.1协议插件不存在!");
|
||||||
//}
|
}
|
||||||
|
|
||||||
|
|
||||||
//await _producerBus.Publish(new MessageReceived
|
//await _producerBus.Publish(new MessageReceived
|
||||||
@ -161,44 +160,16 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
// MessageId = NewId.NextGuid().ToString()
|
// MessageId = NewId.NextGuid().ToString()
|
||||||
//});
|
//});
|
||||||
|
|
||||||
if(tB3761?.AFN_FC?.AFN==null || tB3761.DT?.Fn==null)
|
|
||||||
{
|
|
||||||
_logger.LogError("376.1协议解析AFN失败");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 登录心跳已做了处理,故需要忽略登录和心跳帧
|
|
||||||
//if(tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳)
|
|
||||||
// return;
|
|
||||||
|
|
||||||
//TODO:根据AFN进行分流推送到kafka
|
//TODO:根据AFN进行分流推送到kafka
|
||||||
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761?.AFN_FC?.AFN.ToString().PadLeft(2,'0'));
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
||||||
|
|
||||||
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
|
|
||||||
|
|
||||||
if(topics.Contains(topicName))
|
|
||||||
await _producerService.ProduceAsync(topicName, new MessageReceived
|
|
||||||
{
|
|
||||||
ClientId = tcpSessionClient.Id,
|
|
||||||
ClientIp = tcpSessionClient.IP,
|
|
||||||
ClientPort = tcpSessionClient.Port,
|
|
||||||
MessageHexString = messageHexString,
|
|
||||||
DeviceNo = tB3761?.A?.Code!,
|
|
||||||
MessageId = Guid.NewGuid().ToString()
|
|
||||||
});
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
_logger.LogError($"不支持的上报kafka主题:{topicName}");
|
ClientId = tcpSessionClient.Id,
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
ClientIp = tcpSessionClient.IP,
|
||||||
{
|
ClientPort = tcpSessionClient.Port,
|
||||||
ClientId = tcpSessionClient.Id,
|
MessageHexString = messageHexString,
|
||||||
ClientIp = tcpSessionClient.IP,
|
DeviceNo = tB3761?.A?.Code!,
|
||||||
ClientPort = tcpSessionClient.Port,
|
MessageId = Guid.NewGuid().ToString()
|
||||||
MessageHexString = messageHexString,
|
});
|
||||||
DeviceNo = tB3761?.A?.Code!,
|
|
||||||
MessageId = Guid.NewGuid().ToString()
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,8 +21,6 @@ using Volo.Abp.Domain.Repositories;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using JiShe.CollectBus.IoTDB.Provider;
|
using JiShe.CollectBus.IoTDB.Provider;
|
||||||
using JiShe.CollectBus.Protocol.Dto;
|
|
||||||
using System.Collections;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Subscribers
|
namespace JiShe.CollectBus.Subscribers
|
||||||
{
|
{
|
||||||
@ -66,44 +64,26 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
|
||||||
foreach (var issuedEventMessage in issuedEventMessages)
|
foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
{
|
{
|
||||||
var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
|
||||||
if (loginEntity == null)
|
|
||||||
{
|
|
||||||
isAck=false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
|
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
|
||||||
|
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
loginEntity.AckTime = Clock.Now;
|
loginEntity.AckTime = Clock.Now;
|
||||||
loginEntity.IsAck = true;
|
loginEntity.IsAck = true;
|
||||||
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||||
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
|
||||||
isAck = true;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
return SubscribeAck.Success();
|
||||||
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
|
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
|
||||||
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
|
||||||
foreach (var issuedEventMessage in issuedEventMessages)
|
foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
{
|
{
|
||||||
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
|
||||||
if (heartbeatEntity == null)
|
|
||||||
{
|
|
||||||
isAck = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
|
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
|
||||||
|
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
heartbeatEntity.AckTime = Clock.Now;
|
heartbeatEntity.AckTime = Clock.Now;
|
||||||
heartbeatEntity.IsAck = true;
|
heartbeatEntity.IsAck = true;
|
||||||
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||||
@ -112,11 +92,11 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//{
|
//{
|
||||||
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
|
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
|
||||||
//}
|
//}
|
||||||
if(_tcpService.ClientExists(issuedEventMessage.ClientId))
|
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
}
|
}
|
||||||
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
|
||||||
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
||||||
@ -178,21 +158,19 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
||||||
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
|
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
|
||||||
{
|
{
|
||||||
//foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
|
foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
|
||||||
//{
|
{
|
||||||
// var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
// if (protocolPlugin == null)
|
if (protocolPlugin == null)
|
||||||
// {
|
{
|
||||||
// _logger.LogError("协议不存在!");
|
_logger.LogError("协议不存在!");
|
||||||
// }
|
}
|
||||||
// else
|
else
|
||||||
// {
|
{
|
||||||
// //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
|
//await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
|
||||||
// await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
|
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
|
||||||
// }
|
}
|
||||||
//}
|
}
|
||||||
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
|
|
||||||
|
|
||||||
return SubscribeAck.Success();
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,57 +178,21 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
||||||
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
|
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
|
||||||
{
|
{
|
||||||
//foreach (var receivedLoginMessage in receivedLoginMessages)
|
foreach (var receivedLoginMessage in receivedLoginMessages)
|
||||||
//{
|
{
|
||||||
//var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
//if (protocolPlugin == null)
|
if (protocolPlugin == null)
|
||||||
//{
|
{
|
||||||
// _logger.LogError("协议不存在!");
|
_logger.LogError("协议不存在!");
|
||||||
//}
|
}
|
||||||
//else
|
else
|
||||||
//{
|
{
|
||||||
// //await protocolPlugin.LoginAsync(receivedLoginMessage);
|
//await protocolPlugin.LoginAsync(receivedLoginMessage);
|
||||||
// await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
|
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
|
||||||
//}
|
}
|
||||||
|
}
|
||||||
//}
|
|
||||||
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
|
|
||||||
return SubscribeAck.Success();
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)]
|
|
||||||
public async Task<ISubscribeAck> ReceivedAFN00Event(MessageReceived receivedMessage)
|
|
||||||
{
|
|
||||||
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
|
|
||||||
if (tB3761 == null)
|
|
||||||
{
|
|
||||||
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
|
||||||
if (tB3761.DT == null || tB3761.AFN_FC == null)
|
|
||||||
{
|
|
||||||
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
|
||||||
//string serverName = $"AFN{tB3761.AFN_FC.AFN}_F{tB3761.DT.Fn}_Analysis";
|
|
||||||
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
|
|
||||||
|
|
||||||
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<AFN0_F1_AnalysisDto>>(tB3761);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
||||||
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
||||||
<title>后端服务</title>
|
<title>后端服务</title>
|
||||||
|
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user