2025-05-07 15:17:54 +08:00

173 lines
7.9 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository;
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDbProvider _dbProvider;
private readonly IProtocolService _protocolService;
/// <summary>
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="tcpService">The TCP service.</param>
/// <param name="serviceProvider">The service provider.</param>
/// <param name="messageReceivedLoginEventRepository">The message received login event repository.</param>
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
/// <param name="meterReadingRecordsRepository">The device repository.</param>
public SubscriberAppService(ILogger<SubscriberAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IIoTDbProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_messageReceivedLoginEventRepository = messageReceivedLoginEventRepository;
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider;
_protocolService = protocolService;
}
[LogIntercept]
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{
bool isAck = true;
foreach (var issuedEventMessage in issuedEventMessages)
{
//var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
//if (loginEntity == null)
//{
// isAck=false;
// break;
//}
//loginEntity.AckTime = Clock.Now;
//loginEntity.IsAck = true;
//await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
}
// TODO:暂时ACK等后续处理是否放到私信队列中
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{
bool isAck = true;
//foreach (var issuedEventMessage in issuedEventMessages)
//{
// var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
// if (heartbeatEntity == null)
// {
// isAck = false;
// break;
// }
// heartbeatEntity.AckTime = Clock.Now;
// heartbeatEntity.IsAck = true;
// await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
// }
// TODO:暂时ACK等后续处理是否放到私信队列中
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
}
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task<ISubscribeAck> ReceivedEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var currentTime = Clock.Now;
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
//todo 会根据不同的协议进行解析,然后做业务处理
//TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
//if (tB3761 == null)
//{
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
// return SubscribeAck.Success();
//}
//if (tB3761.DT == null || tB3761.AFN_FC == null)
//{
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
// return SubscribeAck.Success();
//}
//报文入库
var entity = new MeterReadingRecords()
{
ReceivedMessageHexString = receivedMessage.MessageHexString,
AFN = (AFN)receivedMessage.Data?.AFN_FC.AFN!,
Fn = receivedMessage.Data.DT.Fn,
Pn = 0,
FocusAddress = "",
MeterAddress = "",
};
//如果没数据,则插入,有数据则更新
var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime);
if (updateEntity == null)
{
await _meterReadingRecordsRepository.InsertAsync(entity, currentTime);
}
//_dbProvider.InsertAsync();
//todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
return SubscribeAck.Success();
}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
{
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
return SubscribeAck.Success();
}
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)]
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
{
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
return SubscribeAck.Success();
}
}
}