优化下发指令构建机制,将构建与下发拆分,走单独流程处理。
This commit is contained in:
parent
0b2b3bc3d2
commit
0efb87482d
@ -19,6 +19,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
/// <returns></returns>
|
||||
Task<List<GatherItemInfo>> GetGatherItemByDataTypes();
|
||||
|
||||
/// <summary>
|
||||
/// 构建待处理的下发指令任务处理
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task CreateToBeIssueTasks();
|
||||
|
||||
#region 电表采集处理
|
||||
/// <summary>
|
||||
/// 获取电表信息
|
||||
@ -35,19 +41,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
Task InitAmmeterCacheData(string gatherCode = "");
|
||||
|
||||
/// <summary>
|
||||
/// 1分钟采集电表数据
|
||||
/// 1分钟采集电表数据,只获取任务数据下发,不构建任务
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task AmmeterScheduledMeterOneMinuteReading();
|
||||
|
||||
/// <summary>
|
||||
/// 5分钟采集电表数据
|
||||
/// 5分钟采集电表数据,只获取任务数据下发,不构建任务
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task AmmeterScheduledMeterFiveMinuteReading();
|
||||
|
||||
/// <summary>
|
||||
/// 15分钟采集电表数据
|
||||
/// 15分钟采集电表数据,只获取任务数据下发,不构建任务
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task AmmeterScheduledMeterFifteenMinuteReading();
|
||||
@ -64,7 +70,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "");
|
||||
|
||||
/// <summary>
|
||||
/// 初始化水表缓存数据
|
||||
/// 初始化水表缓存数据,只获取任务数据下发,不构建任务
|
||||
/// </summary>
|
||||
/// <param name="gatherCode">采集端Code</param>
|
||||
/// <returns></returns>
|
||||
@ -77,13 +83,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
Task WatermeterScheduledMeterOneMinuteReading();
|
||||
|
||||
/// <summary>
|
||||
/// 5分钟采集水表数据
|
||||
/// 5分钟采集水表数据,只获取任务数据下发,不构建任务
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task WatermeterScheduledMeterFiveMinuteReading();
|
||||
|
||||
/// <summary>
|
||||
/// 15分钟采集水表数据
|
||||
/// 15分钟采集水表数据,只获取任务数据下发,不构建任务
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task WatermeterScheduledMeterFifteenMinuteReading();
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -34,6 +34,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
/// </summary>
|
||||
/// <param name="logger">The logger.</param>
|
||||
/// <param name="tcpService">The TCP service.</param>
|
||||
/// <param name="deviceRepository">The Device pepository.</param>
|
||||
/// <param name="serviceProvider">The service provider.</param>
|
||||
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
||||
ITcpService tcpService,
|
||||
@ -69,7 +70,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||
if (device != null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
|
||||
}
|
||||
}
|
||||
@ -96,7 +97,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||
if (device != null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
|
||||
}
|
||||
}
|
||||
@ -123,7 +124,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||
if (device != null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
|
||||
}
|
||||
}
|
||||
@ -152,7 +153,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||
if (device != null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
|
||||
}
|
||||
}
|
||||
@ -179,7 +180,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||
if (device != null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
|
||||
}
|
||||
}
|
||||
@ -206,7 +207,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
||||
if (device != null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Hangfire;
|
||||
using JiShe.CollectBus.Common.Consts;
|
||||
using JiShe.CollectBus.ScheduledMeterReading;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
using Volo.Abp.Uow;
|
||||
|
||||
namespace JiShe.CollectBus.Workers
|
||||
{
|
||||
/// <summary>
|
||||
/// 构建待处理的下发指令任务处理
|
||||
/// </summary>
|
||||
public class CreateToBeIssueTaskWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker
|
||||
{
|
||||
private readonly ILogger<CreateToBeIssueTaskWorker> _logger;
|
||||
private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="CreateToBeIssueTaskWorker"/> class.
|
||||
/// </summary>
|
||||
/// <param name="logger">The logger.</param>
|
||||
/// <param name="scheduledMeterReadingService">定时任务</param>
|
||||
public CreateToBeIssueTaskWorker(ILogger<CreateToBeIssueTaskWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
|
||||
{
|
||||
_logger = logger;
|
||||
RecurringJobId = nameof(CreateToBeIssueTaskWorker);
|
||||
CronExpression = $"*/{1} * * * *"; ;
|
||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||
}
|
||||
|
||||
|
||||
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
_logger.LogWarning($"构建待处理的下发指令任务处理开始");
|
||||
//await _scheduledMeterReadingService.CreateToBeIssueTasks();
|
||||
_logger.LogWarning($"构建待处理的下发指令任务处理结束");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -26,14 +26,18 @@ namespace JiShe.CollectBus.Workers
|
||||
{
|
||||
_logger = logger;
|
||||
RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
|
||||
CronExpression = Cron.Hourly(15);
|
||||
CronExpression = $"*/{15} * * * *";
|
||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||
}
|
||||
|
||||
|
||||
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading();
|
||||
_logger.LogWarning($"15分钟采集数据开始");
|
||||
//await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading();
|
||||
//await _scheduledMeterReadingService.WatermeterScheduledMeterFifteenMinuteReading();
|
||||
|
||||
_logger.LogWarning($"15分钟采集数据结束");
|
||||
//using (var uow = LazyServiceProvider.LazyGetRequiredService<IUnitOfWorkManager>().Begin())
|
||||
//{
|
||||
// Logger.LogInformation("Executed MyLogWorker..!");
|
||||
|
||||
@ -18,7 +18,7 @@ namespace JiShe.CollectBus.Workers
|
||||
private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="SubscriberFifteenMinuteWorker"/> class.
|
||||
/// Initializes a new instance of the <see cref="SubscriberFiveMinuteWorker"/> class.
|
||||
/// </summary>
|
||||
/// <param name="logger">The logger.</param>
|
||||
/// <param name="scheduledMeterReadingService">定时任务</param>
|
||||
@ -26,14 +26,18 @@ namespace JiShe.CollectBus.Workers
|
||||
{
|
||||
_logger = logger;
|
||||
RecurringJobId = nameof(SubscriberFiveMinuteWorker);
|
||||
CronExpression = Cron.Hourly(15);
|
||||
CronExpression = $"*/{5} * * * *";
|
||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||
}
|
||||
|
||||
|
||||
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading();
|
||||
_logger.LogWarning($"5分钟采集数据开始");
|
||||
//await _scheduledMeterReadingService.AmmeterScheduledMeterFiveMinuteReading();
|
||||
//await _scheduledMeterReadingService.WatermeterScheduledMeterFiveMinuteReading();
|
||||
|
||||
_logger.LogWarning($"5分钟采集数据结束");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ namespace JiShe.CollectBus.Workers
|
||||
private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="SubscriberFifteenMinuteWorker"/> class.
|
||||
/// Initializes a new instance of the <see cref="SubscriberOneMinuteWorker"/> class.
|
||||
/// </summary>
|
||||
/// <param name="logger">The logger.</param>
|
||||
/// <param name="scheduledMeterReadingService">定时任务</param>
|
||||
@ -26,14 +26,19 @@ namespace JiShe.CollectBus.Workers
|
||||
{
|
||||
_logger = logger;
|
||||
RecurringJobId = nameof(SubscriberOneMinuteWorker);
|
||||
CronExpression = Cron.Hourly(15);
|
||||
CronExpression = $"*/{1} * * * *";
|
||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||
}
|
||||
|
||||
|
||||
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading();
|
||||
_logger.LogWarning($"1分钟采集数据开始");
|
||||
//await _scheduledMeterReadingService.AmmeterScheduledMeterOneMinuteReading();
|
||||
|
||||
//await _scheduledMeterReadingService.WatermeterScheduledMeterOneMinuteReading();
|
||||
|
||||
_logger.LogWarning($"1分钟采集数据结束");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,24 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Common.BuildSendDatas
|
||||
{
|
||||
/// <summary>
|
||||
/// 待下发的指令生产任务数据
|
||||
/// </summary>
|
||||
public class TasksToBeIssueModel
|
||||
{
|
||||
/// <summary>
|
||||
/// 下个任务时间
|
||||
/// </summary>
|
||||
public DateTime NextTask { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 采集时间间隔,1分钟,5分钟,15分钟
|
||||
/// </summary>
|
||||
public int TimeDensity { get; set; }
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,34 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Common.BuildSendDatas
|
||||
{
|
||||
/// <summary>
|
||||
/// 报文构建参数
|
||||
/// </summary>
|
||||
public class TelemetryPacketRequest
|
||||
{
|
||||
/// <summary>
|
||||
/// 集中器地址
|
||||
/// </summary>
|
||||
public string FocusAddress { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 抄读功能码
|
||||
/// </summary>
|
||||
public int Fn { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 抄读计量点,也就是终端电表对应端口
|
||||
/// </summary>
|
||||
public int Pn { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 透明转发单元
|
||||
/// </summary>
|
||||
public List<string> DataUnit { get; set; }
|
||||
}
|
||||
}
|
||||
@ -29,13 +29,18 @@ namespace JiShe.CollectBus.Common.Consts
|
||||
public const string FifteenMinuteAcquisitionTimeInterval = $"Fifteen";
|
||||
|
||||
/// <summary>
|
||||
/// 缓存电表信息
|
||||
/// 缓存表计信息,{0}=>系统类型,{1}=>表计类别
|
||||
/// </summary>
|
||||
public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:AmmeterInfo:";
|
||||
public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:MeterInfo:{"{1}"}:{"{2}"}:";
|
||||
|
||||
/// <summary>
|
||||
/// 缓存水表信息
|
||||
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>表计类别,{2}=>采集频率
|
||||
/// </summary>
|
||||
public const string CacheWatermeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:WatermeterInfo:";
|
||||
public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}"}:TaskInfo:{"{1}"}:{"{2}"}";
|
||||
|
||||
/// <summary>
|
||||
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>表计类别,{2}=>采集频率
|
||||
/// </summary>
|
||||
public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:TelemetryPacket:{"{1}"}:{"{2}"}:";
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,7 +12,7 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
|
||||
/// <summary>
|
||||
/// 消息内容
|
||||
/// </summary>
|
||||
public byte[] Message { get; set; }
|
||||
public string MessageHexString { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 集中器编号
|
||||
@ -33,5 +33,10 @@ namespace JiShe.CollectBus.IotSystems.MessageIssueds
|
||||
/// 是否下发成功
|
||||
/// </summary>
|
||||
public bool WasSuccessful { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 创建时间
|
||||
/// </summary>
|
||||
public DateTime CreationTime { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user