2025-03-17 14:23:48 +08:00

216 lines
9.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
using Volo.Abp.Caching;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
/// <summary>
/// 定时抄读任务消息消费订阅
/// </summary>
[Route($"/worker/app/subscriber")]
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe
{
private readonly ILogger<WorkerSubscriberAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IRepository<Device, Guid> _deviceRepository;
/// <summary>
/// Initializes a new instance of the <see cref="WorkerSubscriberAppService"/> class.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="tcpService">The TCP service.</param>
/// <param name="serviceProvider">The service provider.</param>
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
ITcpService tcpService,
IRepository<Device, Guid> deviceRepository,
IServiceProvider serviceProvider)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_deviceRepository = deviceRepository;
}
#region
/// <summary>
/// 一分钟定时抄读任务消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("ammeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("ammeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("ammeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
#endregion
#region
/// <summary>
/// 一分钟定时抄读任务消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
#endregion
}
}