Merge branch 'feature_定时抄读_CY' into dev

This commit is contained in:
ChenYi 2025-03-13 10:51:59 +08:00
commit 0924dcf9a5
7 changed files with 354 additions and 65 deletions

View File

@ -15,6 +15,8 @@ namespace JiShe.CollectBus.Workers
/// </summary> /// </summary>
public interface IScheduledMeterReadingService : IApplicationService public interface IScheduledMeterReadingService : IApplicationService
{ {
#region
/// <summary> /// <summary>
/// 获取电表信息 /// 获取电表信息
/// </summary> /// </summary>
@ -29,6 +31,28 @@ namespace JiShe.CollectBus.Workers
/// <returns></returns> /// <returns></returns>
Task InitAmmeterCacheData(string gatherCode = ""); Task InitAmmeterCacheData(string gatherCode = "");
/// <summary>
/// 1分钟采集电表数据
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterOneMinuteReading();
/// <summary>
/// 5分钟采集电表数据
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFiveMinuteReading();
/// <summary>
/// 15分钟采集电表数据
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFifteenMinuteReading();
#endregion
#region
/// <summary> /// <summary>
/// 获取水表信息 /// 获取水表信息
/// </summary> /// </summary>
@ -44,21 +68,24 @@ namespace JiShe.CollectBus.Workers
Task InitWatermeterCacheData(string gatherCode = ""); Task InitWatermeterCacheData(string gatherCode = "");
/// <summary> /// <summary>
/// 1分钟采集表数据 /// 1分钟采集表数据
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterOneMinuteReading(); Task WatermeterScheduledMeterOneMinuteReading();
/// <summary> /// <summary>
/// 5分钟采集表数据 /// 5分钟采集表数据
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterFiveMinuteReading(); Task WatermeterScheduledMeterFiveMinuteReading();
/// <summary> /// <summary>
/// 15分钟采集表数据 /// 15分钟采集表数据
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterFifteenMinuteReading(); Task WatermeterScheduledMeterFifteenMinuteReading();
#endregion
} }
} }

View File

@ -10,22 +10,45 @@ namespace JiShe.CollectBus.Subscribers
/// </summary> /// </summary>
public interface IWorkerSubscriberAppService : IApplicationService public interface IWorkerSubscriberAppService : IApplicationService
{ {
#region
/// <summary> /// <summary>
/// 1分钟采集电表数据下行消息消费订阅 /// 1分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage); Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
/// <summary> /// <summary>
/// 5分钟采集电表数据下行消息消费订阅 /// 5分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage); Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
/// <summary> /// <summary>
/// 15分钟采集电表数据下行消息消费订阅 /// 15分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage); Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
#endregion
#region
/// <summary>
/// 1分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
/// <summary>
/// 5分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
/// <summary>
/// 15分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
#endregion
} }
} }

View File

@ -2,6 +2,7 @@
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Watermeter; using JiShe.CollectBus.Watermeter;
using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -17,11 +18,19 @@ namespace JiShe.CollectBus.Workers
/// </summary> /// </summary>
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{ {
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
public BasicScheduledMeterReadingService(ILogger<BasicScheduledMeterReadingService> logger)
{
_logger = logger;
}
/// <summary> /// <summary>
/// 系统类型 /// 系统类型
/// </summary> /// </summary>
public abstract string SystemType { get; } public abstract string SystemType { get; }
#region
/// <summary> /// <summary>
/// 获取电表信息 /// 获取电表信息
/// </summary> /// </summary>
@ -45,32 +54,115 @@ namespace JiShe.CollectBus.Workers
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化电表缓存数据时,电表数据为空"); throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化电表缓存数据时,电表数据为空");
} }
//将表计信息根据集中器分组 //根据采集频率分组
var meterInfoGroup = meterInfos.GroupBy(x => x.FocusAddress).ToList(); var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
foreach (var item in meterInfoGroup) foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{ {
if (string.IsNullOrWhiteSpace(item.Key)) //将表计信息根据集中器分组
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{ {
continue; if (string.IsNullOrWhiteSpace(item.Key))
} {
continue;
}
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy)}{item.Key}"; var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>(); Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>();
foreach (var subItem in item) foreach (var subItem in item)
{ {
keyValuePairs.TryAdd($"{subItem.ID}", subItem); keyValuePairs.TryAdd($"{subItem.ID}", subItem);
}
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
} }
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
} }
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
} }
/// <summary>
/// 1分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
//获取缓存中的电表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*";
var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <=0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
//通过lua脚本一次性获取所有缓存内容
var luaScript = @"
local results = {}
for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key)
results[i] = {key, data}
end
return results";
var oneMinuteAmmerterResult = FreeRedisProvider.FreeRedis.Eval(luaScript, oneMinutekeyList); // 传递 KEYS
if (oneMinuteAmmerterResult == null)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
// 解析结果(结果为嵌套数组)
var parsedResults = new Dictionary<string, Dictionary<string, string>>();
if (oneMinuteAmmerterResult is object[] arr)
{
foreach (object[] item in arr)
{
string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1];
var dict = new Dictionary<string, string>();
for (int i = 0; i < fieldsAndValues.Length; i += 2)
{
string field = (string)fieldsAndValues[i];
string value = (string)fieldsAndValues[i + 1];
dict[field] = value;
}
parsedResults[key] = dict;
}
}
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理完成");
}
/// <summary>
/// 5分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual Task AmmeterScheduledMeterFiveMinuteReading()
{
throw new NotImplementedException($"{nameof(AmmeterScheduledMeterFiveMinuteReading)}请根据不同系统类型进行实现");
}
/// <summary>
/// 15分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual Task AmmeterScheduledMeterFifteenMinuteReading()
{
throw new NotImplementedException($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)}请根据不同系统类型进行实现");
}
#endregion
#region
/// <summary> /// <summary>
/// 获取水表信息 /// 获取水表信息
/// </summary> /// </summary>
/// <param name="gatherCode">采集端Code</param> /// <param name="gatherCode">采集端Code</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "") public virtual Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "")
{ {
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现"); throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
} }
@ -85,55 +177,66 @@ namespace JiShe.CollectBus.Workers
var meterInfos = await GetWatermeterInfoList(gatherCode); var meterInfos = await GetWatermeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空"); throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
} }
//根据采集频率分组
//将表计信息根据集中器分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var meterInfoGroup = meterInfos.GroupBy(x => x.FocusAddress).ToList(); foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
foreach (var item in meterInfoGroup)
{ {
if (string.IsNullOrWhiteSpace(item.Key)) //将表计信息根据集中器分组
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{ {
continue; if (string.IsNullOrWhiteSpace(item.Key))
} {
continue;
}
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy)}{item.Key}"; var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>(); Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
foreach (var subItem in item) foreach (var subItem in item)
{ {
keyValuePairs.TryAdd($"{subItem.ID}", subItem); keyValuePairs.TryAdd($"{subItem.ID}", subItem);
}
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
} }
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
} }
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
} }
/// <summary> /// <summary>
/// 1分钟采集表数据 /// 1分钟采集表数据
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public virtual Task ScheduledMeterOneMinuteReading() public virtual async Task WatermeterScheduledMeterOneMinuteReading()
{ {
//获取缓存中的电表信息
var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*";
var oneMinuteList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
throw new NotImplementedException($"{nameof(ScheduledMeterOneMinuteReading)}请根据不同系统类型进行实现"); _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理完成");
} }
/// <summary> /// <summary>
/// 5分钟采集电表数据 /// 5分钟采集电表数据
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public virtual Task ScheduledMeterFiveMinuteReading() public virtual Task WatermeterScheduledMeterFiveMinuteReading()
{ {
throw new NotImplementedException($"{nameof(ScheduledMeterFiveMinuteReading)}请根据不同系统类型进行实现");
throw new NotImplementedException($"{nameof(WatermeterScheduledMeterFiveMinuteReading)}请根据不同系统类型进行实现");
} }
/// <summary> /// <summary>
/// 15分钟采集电表数据 /// 15分钟采集电表数据
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public virtual Task ScheduledMeterFifteenMinuteReading() public virtual Task WatermeterScheduledMeterFifteenMinuteReading()
{ {
throw new NotImplementedException($"{nameof(ScheduledMeterFifteenMinuteReading)}请根据不同系统类型进行实现"); throw new NotImplementedException($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)}请根据不同系统类型进行实现");
} }
#endregion
} }
} }

View File

@ -7,6 +7,7 @@ using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Watermeter; using JiShe.CollectBus.Watermeter;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -25,9 +26,12 @@ namespace JiShe.CollectBus.Workers
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{ {
private readonly IRepository<Device, Guid> _deviceRepository; private readonly IRepository<Device, Guid> _deviceRepository;
public EnergySystemScheduledMeterReadingService(IRepository<Device, Guid> deviceRepository) private readonly ILogger<BasicScheduledMeterReadingService> _logger;
public EnergySystemScheduledMeterReadingService(IRepository<Device, Guid> deviceRepository, ILogger<BasicScheduledMeterReadingService> logger):base(logger)
{ {
this._deviceRepository = deviceRepository; this._deviceRepository = deviceRepository;
this._logger = logger;
} }
public sealed override string SystemType => SystemTypeConst.Energy; public sealed override string SystemType => SystemTypeConst.Energy;

View File

@ -26,6 +26,7 @@ namespace JiShe.CollectBus.Subscribers
private readonly ILogger<WorkerSubscriberAppService> _logger; private readonly ILogger<WorkerSubscriberAppService> _logger;
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IRepository<Device, Guid> _deviceRepository;
/// <summary> /// <summary>
@ -35,22 +36,27 @@ namespace JiShe.CollectBus.Subscribers
/// <param name="tcpService">The TCP service.</param> /// <param name="tcpService">The TCP service.</param>
/// <param name="serviceProvider">The service provider.</param> /// <param name="serviceProvider">The service provider.</param>
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger, public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
ITcpService tcpService, IServiceProvider serviceProvider) ITcpService tcpService,
IRepository<Device, Guid> deviceRepository,
IServiceProvider serviceProvider)
{ {
_logger = logger; _logger = logger;
_tcpService = tcpService; _tcpService = tcpService;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_deviceRepository = deviceRepository;
} }
#region
/// <summary> /// <summary>
/// 一分钟定时抄读任务消息消费订阅 /// 一分钟定时抄读任务消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("oneminute/issued-event")] [Route("ammeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task ScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage) public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -60,7 +66,12 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
} }
} }
@ -70,9 +81,9 @@ namespace JiShe.CollectBus.Subscribers
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("fiveminute/issued-event")] [Route("ammeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task ScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage) public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -82,7 +93,12 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
} }
} }
@ -92,9 +108,9 @@ namespace JiShe.CollectBus.Subscribers
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[HttpPost] [HttpPost]
[Route("fifteenminute/issued-event")] [Route("ammeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task ScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage) public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -104,8 +120,96 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message); var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
} }
} }
#endregion
#region
/// <summary>
/// 一分钟定时抄读任务消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
}
}
}
#endregion
} }
} }

View File

@ -13,14 +13,29 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary> /// </summary>
public const string CacheBasicDirectoryKey = "CollectBus:"; public const string CacheBasicDirectoryKey = "CollectBus:";
/// <summary>
/// 1分钟采集间隔
/// </summary>
public const string OneMinuteAcquisitionTimeInterval = $"one";
/// <summary>
/// 5分钟采集间隔
/// </summary>
public const string FiveMinuteAcquisitionTimeInterval = $"Five";
/// <summary>
/// 15分钟采集间隔
/// </summary>
public const string FifteenMinuteAcquisitionTimeInterval = $"Fifteen";
/// <summary> /// <summary>
/// 缓存电表信息 /// 缓存电表信息
/// </summary> /// </summary>
public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:AmmeterInfo:"; public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:AmmeterInfo:";
/// <summary> /// <summary>
/// 缓存水表信息 /// 缓存水表信息
/// </summary> /// </summary>
public const string CacheWatermeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:WatermeterInfo:"; public const string CacheWatermeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:WatermeterInfo:";
} }
} }

View File

@ -17,15 +17,28 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// <summary> /// <summary>
/// 1分钟采集电表数据下行消息主题 /// 1分钟采集电表数据下行消息主题
/// </summary> /// </summary>
public const string SubscriberWorkerOneMinuteIssuedEventName = "issued.oneminute.event"; public const string AmmeterSubscriberWorkerOneMinuteIssuedEventName = "issued.one.ammeter.event";
/// <summary> /// <summary>
/// 5分钟采集电表数据下行消息主题 /// 5分钟采集电表数据下行消息主题
/// </summary> /// </summary>
public const string SubscriberWorkerFiveMinuteIssuedEventName = "issued.fiveminute.event"; public const string AmmeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.five.ammeter.event";
/// <summary> /// <summary>
/// 15分钟采集电表数据下行消息主题 /// 15分钟采集电表数据下行消息主题
/// </summary> /// </summary>
public const string SubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteenminute.event"; public const string AmmeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.ammeter.event";
/// <summary>
/// 1分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.one.watermeter.event";
/// <summary>
/// 5分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.five.watermeter.event";
/// <summary>
/// 15分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.watermeter.event";
} }
} }