Compare commits
No commits in common. "0924dcf9a5586aecc9c2074cf91aa84c712990b4" and "9a5cb565e95a44db38f68208d3c7097bf62b50b3" have entirely different histories.
0924dcf9a5
...
9a5cb565e9
@ -15,15 +15,13 @@ namespace JiShe.CollectBus.Workers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public interface IScheduledMeterReadingService : IApplicationService
|
public interface IScheduledMeterReadingService : IApplicationService
|
||||||
{
|
{
|
||||||
|
|
||||||
#region 电表采集处理
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取电表信息
|
/// 获取电表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="gatherCode">采集端Code</param>
|
/// <param name="gatherCode">采集端Code</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "");
|
Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "");
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 初始化电表缓存数据
|
/// 初始化电表缓存数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -31,28 +29,6 @@ namespace JiShe.CollectBus.Workers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task InitAmmeterCacheData(string gatherCode = "");
|
Task InitAmmeterCacheData(string gatherCode = "");
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 1分钟采集电表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task AmmeterScheduledMeterOneMinuteReading();
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集电表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task AmmeterScheduledMeterFiveMinuteReading();
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集电表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task AmmeterScheduledMeterFifteenMinuteReading();
|
|
||||||
|
|
||||||
#endregion
|
|
||||||
|
|
||||||
|
|
||||||
#region 水表采集处理
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取水表信息
|
/// 获取水表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -68,24 +44,21 @@ namespace JiShe.CollectBus.Workers
|
|||||||
Task InitWatermeterCacheData(string gatherCode = "");
|
Task InitWatermeterCacheData(string gatherCode = "");
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集水表数据
|
/// 1分钟采集电表数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task WatermeterScheduledMeterOneMinuteReading();
|
Task ScheduledMeterOneMinuteReading();
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集水表数据
|
/// 5分钟采集电表数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task WatermeterScheduledMeterFiveMinuteReading();
|
Task ScheduledMeterFiveMinuteReading();
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集水表数据
|
/// 15分钟采集电表数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task WatermeterScheduledMeterFifteenMinuteReading();
|
Task ScheduledMeterFifteenMinuteReading();
|
||||||
#endregion
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,45 +10,22 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public interface IWorkerSubscriberAppService : IApplicationService
|
public interface IWorkerSubscriberAppService : IApplicationService
|
||||||
{
|
{
|
||||||
|
|
||||||
#region 电表消息采集
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集电表数据下行消息消费订阅
|
/// 1分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
Task ScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据下行消息消费订阅
|
/// 5分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
Task ScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集电表数据下行消息消费订阅
|
/// 15分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
Task ScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||||
#endregion
|
|
||||||
|
|
||||||
#region 水表消息采集
|
|
||||||
/// <summary>
|
|
||||||
/// 1分钟采集水表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集水表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集水表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
|
|
||||||
#endregion
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,6 @@
|
|||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Watermeter;
|
using JiShe.CollectBus.Watermeter;
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
@ -18,19 +17,11 @@ namespace JiShe.CollectBus.Workers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
|
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
|
||||||
{
|
{
|
||||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
|
||||||
public BasicScheduledMeterReadingService(ILogger<BasicScheduledMeterReadingService> logger)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 系统类型
|
/// 系统类型
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public abstract string SystemType { get; }
|
public abstract string SystemType { get; }
|
||||||
|
|
||||||
#region 电表采集处理
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取电表信息
|
/// 获取电表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -54,115 +45,32 @@ namespace JiShe.CollectBus.Workers
|
|||||||
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化电表缓存数据时,电表数据为空");
|
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化电表缓存数据时,电表数据为空");
|
||||||
}
|
}
|
||||||
|
|
||||||
//根据采集频率分组
|
//将表计信息根据集中器分组
|
||||||
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
var meterInfoGroup = meterInfos.GroupBy(x => x.FocusAddress).ToList();
|
||||||
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
|
foreach (var item in meterInfoGroup)
|
||||||
{
|
{
|
||||||
//将表计信息根据集中器分组
|
if (string.IsNullOrWhiteSpace(item.Key))
|
||||||
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
|
|
||||||
foreach (var item in meterInfoGroup)
|
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(item.Key))
|
continue;
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
|
|
||||||
Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>();
|
|
||||||
foreach (var subItem in item)
|
|
||||||
{
|
|
||||||
|
|
||||||
keyValuePairs.TryAdd($"{subItem.ID}", subItem);
|
|
||||||
}
|
|
||||||
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
|
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy)}{item.Key}";
|
||||||
}
|
Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>();
|
||||||
|
foreach (var subItem in item)
|
||||||
/// <summary>
|
|
||||||
/// 1分钟采集电表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
|
|
||||||
{
|
|
||||||
//获取缓存中的电表信息
|
|
||||||
var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*";
|
|
||||||
var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
|
|
||||||
if (oneMinutekeyList == null || oneMinutekeyList.Length <=0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//通过lua脚本一次性获取所有缓存内容
|
|
||||||
var luaScript = @"
|
|
||||||
local results = {}
|
|
||||||
for i, key in ipairs(KEYS) do
|
|
||||||
local data = redis.call('HGETALL', key)
|
|
||||||
results[i] = {key, data}
|
|
||||||
end
|
|
||||||
return results";
|
|
||||||
var oneMinuteAmmerterResult = FreeRedisProvider.FreeRedis.Eval(luaScript, oneMinutekeyList); // 传递 KEYS
|
|
||||||
if (oneMinuteAmmerterResult == null)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析结果(结果为嵌套数组)
|
|
||||||
var parsedResults = new Dictionary<string, Dictionary<string, string>>();
|
|
||||||
if (oneMinuteAmmerterResult is object[] arr)
|
|
||||||
{
|
|
||||||
foreach (object[] item in arr)
|
|
||||||
{
|
{
|
||||||
string key = (string)item[0];
|
|
||||||
object[] fieldsAndValues = (object[])item[1];
|
|
||||||
|
|
||||||
var dict = new Dictionary<string, string>();
|
keyValuePairs.TryAdd($"{subItem.ID}", subItem);
|
||||||
for (int i = 0; i < fieldsAndValues.Length; i += 2)
|
|
||||||
{
|
|
||||||
string field = (string)fieldsAndValues[i];
|
|
||||||
string value = (string)fieldsAndValues[i + 1];
|
|
||||||
dict[field] = value;
|
|
||||||
}
|
|
||||||
parsedResults[key] = dict;
|
|
||||||
}
|
}
|
||||||
|
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理完成");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集电表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public virtual Task AmmeterScheduledMeterFiveMinuteReading()
|
|
||||||
{
|
|
||||||
throw new NotImplementedException($"{nameof(AmmeterScheduledMeterFiveMinuteReading)}请根据不同系统类型进行实现");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集电表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public virtual Task AmmeterScheduledMeterFifteenMinuteReading()
|
|
||||||
{
|
|
||||||
throw new NotImplementedException($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)}请根据不同系统类型进行实现");
|
|
||||||
}
|
|
||||||
#endregion
|
|
||||||
|
|
||||||
|
|
||||||
#region 水表采集处理
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取水表信息
|
/// 获取水表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="gatherCode">采集端Code</param>
|
/// <param name="gatherCode">采集端Code</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "")
|
public virtual async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "")
|
||||||
{
|
{
|
||||||
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
|
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
|
||||||
}
|
}
|
||||||
@ -177,66 +85,55 @@ namespace JiShe.CollectBus.Workers
|
|||||||
var meterInfos = await GetWatermeterInfoList(gatherCode);
|
var meterInfos = await GetWatermeterInfoList(gatherCode);
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
|
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
|
||||||
}
|
}
|
||||||
//根据采集频率分组
|
|
||||||
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
//将表计信息根据集中器分组
|
||||||
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
|
var meterInfoGroup = meterInfos.GroupBy(x => x.FocusAddress).ToList();
|
||||||
|
foreach (var item in meterInfoGroup)
|
||||||
{
|
{
|
||||||
//将表计信息根据集中器分组
|
if (string.IsNullOrWhiteSpace(item.Key))
|
||||||
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
|
|
||||||
foreach (var item in meterInfoGroup)
|
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(item.Key))
|
continue;
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
|
|
||||||
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
|
|
||||||
foreach (var subItem in item)
|
|
||||||
{
|
|
||||||
|
|
||||||
keyValuePairs.TryAdd($"{subItem.ID}", subItem);
|
|
||||||
}
|
|
||||||
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy)}{item.Key}";
|
||||||
|
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
|
||||||
|
foreach (var subItem in item)
|
||||||
|
{
|
||||||
|
|
||||||
|
keyValuePairs.TryAdd($"{subItem.ID}", subItem);
|
||||||
|
}
|
||||||
|
await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
|
||||||
}
|
}
|
||||||
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集水表数据
|
/// 1分钟采集电表数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task WatermeterScheduledMeterOneMinuteReading()
|
public virtual Task ScheduledMeterOneMinuteReading()
|
||||||
{
|
{
|
||||||
//获取缓存中的电表信息
|
|
||||||
var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*";
|
throw new NotImplementedException($"{nameof(ScheduledMeterOneMinuteReading)}请根据不同系统类型进行实现");
|
||||||
var oneMinuteList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理完成");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据
|
/// 5分钟采集电表数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual Task WatermeterScheduledMeterFiveMinuteReading()
|
public virtual Task ScheduledMeterFiveMinuteReading()
|
||||||
{
|
{
|
||||||
|
throw new NotImplementedException($"{nameof(ScheduledMeterFiveMinuteReading)}请根据不同系统类型进行实现");
|
||||||
throw new NotImplementedException($"{nameof(WatermeterScheduledMeterFiveMinuteReading)}请根据不同系统类型进行实现");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集电表数据
|
/// 15分钟采集电表数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual Task WatermeterScheduledMeterFifteenMinuteReading()
|
public virtual Task ScheduledMeterFifteenMinuteReading()
|
||||||
{
|
{
|
||||||
throw new NotImplementedException($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)}请根据不同系统类型进行实现");
|
throw new NotImplementedException($"{nameof(ScheduledMeterFifteenMinuteReading)}请根据不同系统类型进行实现");
|
||||||
}
|
}
|
||||||
#endregion
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,7 +7,6 @@ using JiShe.CollectBus.FreeSql;
|
|||||||
using JiShe.CollectBus.Watermeter;
|
using JiShe.CollectBus.Watermeter;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
@ -26,12 +25,9 @@ namespace JiShe.CollectBus.Workers
|
|||||||
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
||||||
{
|
{
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
public EnergySystemScheduledMeterReadingService(IRepository<Device, Guid> deviceRepository)
|
||||||
|
|
||||||
public EnergySystemScheduledMeterReadingService(IRepository<Device, Guid> deviceRepository, ILogger<BasicScheduledMeterReadingService> logger):base(logger)
|
|
||||||
{
|
{
|
||||||
this._deviceRepository = deviceRepository;
|
this._deviceRepository = deviceRepository;
|
||||||
this._logger = logger;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed override string SystemType => SystemTypeConst.Energy;
|
public sealed override string SystemType => SystemTypeConst.Energy;
|
||||||
|
|||||||
@ -26,7 +26,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
private readonly IServiceProvider _serviceProvider;
|
private readonly IServiceProvider _serviceProvider;
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -36,27 +35,22 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <param name="tcpService">The TCP service.</param>
|
/// <param name="tcpService">The TCP service.</param>
|
||||||
/// <param name="serviceProvider">The service provider.</param>
|
/// <param name="serviceProvider">The service provider.</param>
|
||||||
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
||||||
ITcpService tcpService,
|
ITcpService tcpService, IServiceProvider serviceProvider)
|
||||||
IRepository<Device, Guid> deviceRepository,
|
|
||||||
IServiceProvider serviceProvider)
|
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
_serviceProvider = serviceProvider;
|
_serviceProvider = serviceProvider;
|
||||||
_deviceRepository = deviceRepository;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#region 电表消息采集
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 一分钟定时抄读任务消息消费订阅
|
/// 一分钟定时抄读任务消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/oneminute/issued-event")]
|
[Route("oneminute/issued-event")]
|
||||||
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
public async Task ScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -66,12 +60,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message);
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,9 +70,9 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/fiveminute/issued-event")]
|
[Route("fiveminute/issued-event")]
|
||||||
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
public async Task ScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -93,12 +82,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message);
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,9 +92,9 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/fifteenminute/issued-event")]
|
[Route("fifteenminute/issued-event")]
|
||||||
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
public async Task ScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -120,96 +104,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message);
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endregion
|
|
||||||
|
|
||||||
#region 水表消息采集
|
|
||||||
/// <summary>
|
|
||||||
/// 一分钟定时抄读任务消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="receivedMessage"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
[HttpPost]
|
|
||||||
[Route("watermeter/oneminute/issued-event")]
|
|
||||||
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
|
|
||||||
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集电表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="receivedMessage"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
[HttpPost]
|
|
||||||
[Route("watermeter/fiveminute/issued-event")]
|
|
||||||
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
|
|
||||||
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集电表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="receivedMessage"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
[HttpPost]
|
|
||||||
[Route("watermeter/fifteenminute/issued-event")]
|
|
||||||
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
|
|
||||||
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, receivedMessage.Message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endregion
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -13,29 +13,14 @@ namespace JiShe.CollectBus.Common.Consts
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheBasicDirectoryKey = "CollectBus:";
|
public const string CacheBasicDirectoryKey = "CollectBus:";
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 1分钟采集间隔
|
|
||||||
/// </summary>
|
|
||||||
public const string OneMinuteAcquisitionTimeInterval = $"one";
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集间隔
|
|
||||||
/// </summary>
|
|
||||||
public const string FiveMinuteAcquisitionTimeInterval = $"Five";
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集间隔
|
|
||||||
/// </summary>
|
|
||||||
public const string FifteenMinuteAcquisitionTimeInterval = $"Fifteen";
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 缓存电表信息
|
/// 缓存电表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:AmmeterInfo:";
|
public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:AmmeterInfo:";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 缓存水表信息
|
/// 缓存水表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheWatermeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:{"{1}"}:WatermeterInfo:";
|
public const string CacheWatermeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:WatermeterInfo:";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,28 +17,15 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集电表数据下行消息主题
|
/// 1分钟采集电表数据下行消息主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string AmmeterSubscriberWorkerOneMinuteIssuedEventName = "issued.one.ammeter.event";
|
public const string SubscriberWorkerOneMinuteIssuedEventName = "issued.oneminute.event";
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据下行消息主题
|
/// 5分钟采集电表数据下行消息主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string AmmeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.five.ammeter.event";
|
public const string SubscriberWorkerFiveMinuteIssuedEventName = "issued.fiveminute.event";
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集电表数据下行消息主题
|
/// 15分钟采集电表数据下行消息主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string AmmeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.ammeter.event";
|
public const string SubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteenminute.event";
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 1分钟采集水表数据下行消息主题
|
|
||||||
/// </summary>
|
|
||||||
public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.one.watermeter.event";
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集水表数据下行消息主题
|
|
||||||
/// </summary>
|
|
||||||
public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.five.watermeter.event";
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集水表数据下行消息主题
|
|
||||||
/// </summary>
|
|
||||||
public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.watermeter.event";
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user