优化缓存电表的获取,以及分组均衡的控制。
This commit is contained in:
parent
1b1c4e5683
commit
2fdf5850c8
@ -77,22 +77,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Task InitWatermeterCacheData(string gatherCode = "");
|
Task InitWatermeterCacheData(string gatherCode = "");
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集水表数据
|
/// 水表数据采集
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task WatermeterScheduledMeterOneMinuteReading();
|
Task WatermeterScheduledMeterAutoReading();
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集水表数据,只获取任务数据下发,不构建任务
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task WatermeterScheduledMeterFiveMinuteReading();
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集水表数据,只获取任务数据下发,不构建任务
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task WatermeterScheduledMeterFifteenMinuteReading();
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -39,19 +39,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集水表数据下行消息消费订阅
|
/// 1分钟采集水表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集水表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集水表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,9 +1,13 @@
|
|||||||
using FreeRedis;
|
using FreeRedis;
|
||||||
using FreeSql;
|
using FreeSql;
|
||||||
|
using JiShe.CollectBus.Common.Consts;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.FreeRedisProvider;
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.Localization;
|
using JiShe.CollectBus.Localization;
|
||||||
|
using JiShe.CollectBus.Serializer;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
@ -20,5 +24,141 @@ public abstract class CollectBusAppService : ApplicationService
|
|||||||
{
|
{
|
||||||
LocalizationResource = typeof(CollectBusResource);
|
LocalizationResource = typeof(CollectBusResource);
|
||||||
ObjectMapperContext = typeof(CollectBusApplicationModule);
|
ObjectMapperContext = typeof(CollectBusApplicationModule);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Lua脚本批量获取缓存的表计信息
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">表信息数据对象</typeparam>
|
||||||
|
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
|
||||||
|
/// <param name="systemType"><see cref="SystemTypeConst"/> 系统类型</param>
|
||||||
|
/// <param name="serverTagName">服务器标识</param>
|
||||||
|
/// <param name="timeDensity">采集频率,1分钟、5分钟、15分钟</param>
|
||||||
|
/// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
protected async Task<Dictionary<string, Dictionary<string, T>>> GetMeterRedisCacheDictionaryData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
|
||||||
|
{
|
||||||
|
if (redisKeys == null || redisKeys.Length <=0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity))
|
||||||
|
{
|
||||||
|
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101");
|
||||||
|
}
|
||||||
|
|
||||||
|
//通过lua脚本一次性获取所有缓存内容
|
||||||
|
var luaScript = @"
|
||||||
|
local results = {}
|
||||||
|
for i, key in ipairs(KEYS) do
|
||||||
|
local data = redis.call('HGETALL', key)
|
||||||
|
results[i] = {key, data}
|
||||||
|
end
|
||||||
|
return results";
|
||||||
|
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS
|
||||||
|
if (merterResult == null)
|
||||||
|
{
|
||||||
|
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,没有获取到数据,-102");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 解析结果(结果为嵌套数组)
|
||||||
|
var meterInfos = new Dictionary<string, Dictionary<string, T>>(); ;
|
||||||
|
if (merterResult is object[] arr)
|
||||||
|
{
|
||||||
|
foreach (object[] item in arr)
|
||||||
|
{
|
||||||
|
string key = (string)item[0];//集中器地址对应的Redis缓存Key
|
||||||
|
object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合
|
||||||
|
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}";
|
||||||
|
string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
|
||||||
|
|
||||||
|
var meterHashs = new Dictionary<string, T>();
|
||||||
|
for (int i = 0; i < fieldsAndValues.Length; i += 2)
|
||||||
|
{
|
||||||
|
string meterld = (string)fieldsAndValues[i];//表ID
|
||||||
|
string meterStr = (string)fieldsAndValues[i + 1];//表详情数据
|
||||||
|
|
||||||
|
T meterInfo = default!;
|
||||||
|
if (!string.IsNullOrWhiteSpace(meterStr))
|
||||||
|
{
|
||||||
|
meterInfo = meterStr.Deserialize<T>()!;
|
||||||
|
}
|
||||||
|
if (meterInfo != null)
|
||||||
|
{
|
||||||
|
meterHashs[meterld] = meterInfo;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-102");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
meterInfos[focusAddress] = meterHashs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return meterInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Lua脚本批量获取缓存的表计信息
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">表信息数据对象</typeparam>
|
||||||
|
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
|
||||||
|
/// <param name="systemType"><see cref="SystemTypeConst"/> 系统类型</param>
|
||||||
|
/// <param name="serverTagName">服务器标识</param>
|
||||||
|
/// <param name="timeDensity">采集频率,1分钟、5分钟、15分钟</param>
|
||||||
|
/// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
protected async Task<List<T>> GetMeterRedisCacheListData<T>(string[] redisKeys,string systemType,string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
|
||||||
|
{
|
||||||
|
if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity))
|
||||||
|
{
|
||||||
|
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,参数异常,-101");
|
||||||
|
}
|
||||||
|
|
||||||
|
//通过lua脚本一次性获取所有缓存内容
|
||||||
|
var luaScript = @"
|
||||||
|
local results = {}
|
||||||
|
for i, key in ipairs(KEYS) do
|
||||||
|
local data = redis.call('HGETALL', key)
|
||||||
|
results[i] = {key, data}
|
||||||
|
end
|
||||||
|
return results";
|
||||||
|
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS
|
||||||
|
if (merterResult == null)
|
||||||
|
{
|
||||||
|
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,没有获取到数据,-102");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 解析结果(结果为嵌套数组)
|
||||||
|
var meterInfos = new List<T>(); ;
|
||||||
|
if (merterResult is object[] arr)
|
||||||
|
{
|
||||||
|
foreach (object[] item in arr)
|
||||||
|
{
|
||||||
|
string key = (string)item[0];//集中器地址对应的Redis缓存Key
|
||||||
|
object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合
|
||||||
|
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}";
|
||||||
|
string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
|
||||||
|
|
||||||
|
for (int i = 0; i < fieldsAndValues.Length; i += 2)
|
||||||
|
{
|
||||||
|
string meterld = (string)fieldsAndValues[i];//表ID
|
||||||
|
string meterStr = (string)fieldsAndValues[i + 1];//表详情数据
|
||||||
|
|
||||||
|
T meterInfo = default!;
|
||||||
|
if (!string.IsNullOrWhiteSpace(meterStr))
|
||||||
|
{
|
||||||
|
meterInfo = meterStr.Deserialize<T>()!;
|
||||||
|
}
|
||||||
|
if (meterInfo != null)
|
||||||
|
{
|
||||||
|
meterInfos.Add(meterInfo);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-103");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return meterInfos;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -59,7 +59,7 @@ public class CollectBusApplicationModule : AbpModule
|
|||||||
//默认初始化表计信息
|
//默认初始化表计信息
|
||||||
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||||
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
//dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
|
|
||||||
//初始化主题信息
|
//初始化主题信息
|
||||||
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
|
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Linq;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using DeviceDetectorNET.Parser.Device;
|
using DeviceDetectorNET.Parser.Device;
|
||||||
@ -141,7 +142,15 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
|
|
||||||
await client.ResetIdAsync(deviceNo);
|
await client.ResetIdAsync(deviceNo);
|
||||||
|
|
||||||
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
|
var deviceInfoList= await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
|
||||||
|
if (deviceInfoList != null && deviceInfoList.Count > 1)
|
||||||
|
{
|
||||||
|
//todo 推送集中器编号重复预警
|
||||||
|
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
|
||||||
if (entity == null)
|
if (entity == null)
|
||||||
{
|
{
|
||||||
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
|
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
|
||||||
@ -171,7 +180,15 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
string clientId = deviceNo;
|
string clientId = deviceNo;
|
||||||
string oldClientId = $"{client.Id}";
|
string oldClientId = $"{client.Id}";
|
||||||
|
|
||||||
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
|
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
|
||||||
|
if (deviceInfoList != null && deviceInfoList.Count > 1)
|
||||||
|
{
|
||||||
|
//todo 推送集中器编号重复预警
|
||||||
|
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
|
||||||
if (entity == null) //没有登录帧的设备,只有心跳帧
|
if (entity == null) //没有登录帧的设备,只有心跳帧
|
||||||
{
|
{
|
||||||
await client.ResetIdAsync(clientId);
|
await client.ResetIdAsync(clientId);
|
||||||
|
|||||||
@ -17,6 +17,9 @@ using JiShe.CollectBus.Common.Helpers;
|
|||||||
using JiShe.CollectBus.IotSystems.AFNEntity;
|
using JiShe.CollectBus.IotSystems.AFNEntity;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using JiShe.CollectBus.Common.Consts;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
|
using System.Diagnostics.Metrics;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Samples;
|
namespace JiShe.CollectBus.Samples;
|
||||||
|
|
||||||
@ -108,21 +111,49 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
|
|||||||
[HttpGet]
|
[HttpGet]
|
||||||
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
|
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
|
||||||
{
|
{
|
||||||
var deviceList = new List<string>();
|
//var deviceList = new List<string>();
|
||||||
for (int i = 0; i < deviceCount; i++)
|
//for (int i = 0; i < deviceCount; i++)
|
||||||
|
//{
|
||||||
|
// deviceList.Add($"Device_{Guid.NewGuid()}");
|
||||||
|
//}
|
||||||
|
|
||||||
|
//// 初始化缓存
|
||||||
|
//DeviceGroupBalanceControl.InitializeCache(deviceList);
|
||||||
|
|
||||||
|
var timeDensity = "15";
|
||||||
|
//获取缓存中的电表信息
|
||||||
|
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
|
||||||
|
|
||||||
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
|
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
|
||||||
|
List<string> focusAddressDataLista = new List<string>();
|
||||||
|
foreach (var item in meterInfos)
|
||||||
{
|
{
|
||||||
deviceList.Add($"Device_{Guid.NewGuid()}");
|
focusAddressDataLista.Add(item.FocusAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化缓存
|
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
|
||||||
DeviceGroupBalanceControl.InitializeCache(deviceList);
|
|
||||||
|
|
||||||
// 打印分布统计
|
// 打印分布统计
|
||||||
DeviceGroupBalanceControl.PrintDistributionStats();
|
DeviceGroupBalanceControl.PrintDistributionStats();
|
||||||
|
|
||||||
await Task.CompletedTask;
|
await Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 测试设备分组均衡控制算法获取分组Id
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceAddress"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
[HttpGet]
|
||||||
|
public async Task TestGetDeviceGroupBalanceControl(string deviceAddress)
|
||||||
|
{
|
||||||
|
var groupId = DeviceGroupBalanceControl.GetDeviceGroupId(deviceAddress);
|
||||||
|
Console.WriteLine(groupId);
|
||||||
|
|
||||||
|
await Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 测试单个测点数据项
|
/// 测试单个测点数据项
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
using DotNetCore.CAP;
|
using DeviceDetectorNET.Class.Client;
|
||||||
|
using DotNetCore.CAP;
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.Common.BuildSendDatas;
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
@ -18,6 +19,7 @@ using System;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
@ -32,6 +34,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
private readonly IIoTDBProvider _dbProvider;
|
private readonly IIoTDBProvider _dbProvider;
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
|
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
|
||||||
|
|
||||||
|
|
||||||
public BasicScheduledMeterReadingService(
|
public BasicScheduledMeterReadingService(
|
||||||
ILogger<BasicScheduledMeterReadingService> logger,
|
ILogger<BasicScheduledMeterReadingService> logger,
|
||||||
ICapPublisher producerBus,
|
ICapPublisher producerBus,
|
||||||
@ -79,6 +82,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task CreateToBeIssueTasks()
|
public virtual async Task CreateToBeIssueTasks()
|
||||||
{
|
{
|
||||||
|
|
||||||
|
//创建指定数量的线程,
|
||||||
|
|
||||||
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
|
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
|
||||||
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
|
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
|
||||||
if (taskInfos == null || taskInfos.Length <= 0)
|
if (taskInfos == null || taskInfos.Length <= 0)
|
||||||
@ -94,22 +100,24 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
|
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
//检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过
|
|
||||||
if (!IsGennerateCmd(tasksToBeIssueModel.NextTaskTime))
|
|
||||||
{
|
|
||||||
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
|
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
|
||||||
var tempArryay = item.Split(":");
|
var tempArryay = item.Split(":");
|
||||||
string meteryType = tempArryay[3];//表计类别
|
string meteryType = tempArryay[3];//表计类别
|
||||||
int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率
|
int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率
|
||||||
|
|
||||||
|
//检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过
|
||||||
|
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
|
||||||
|
{
|
||||||
|
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*";
|
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
|
||||||
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -117,10 +125,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
|
||||||
|
|
||||||
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
||||||
{
|
{
|
||||||
// 解析结果(结果为嵌套数组)
|
// 解析结果(结果为嵌套数组)
|
||||||
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, $"{timeDensity}", meteryType);
|
var meterInfos = await GetMeterRedisCacheDictionaryData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
||||||
@ -140,6 +150,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
|
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
|
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
|
||||||
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity);
|
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity);
|
||||||
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
|
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
|
||||||
@ -165,7 +177,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
|
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
|
||||||
{
|
{
|
||||||
|
#if DEBUG
|
||||||
|
var timeDensity = "15";
|
||||||
|
//获取缓存中的电表信息
|
||||||
|
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*";
|
||||||
|
|
||||||
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
|
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
|
||||||
|
List<string> focusAddressDataLista = new List<string>();
|
||||||
|
foreach (var item in meterInfos)
|
||||||
|
{
|
||||||
|
focusAddressDataLista.Add(item.FocusAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
|
||||||
|
return;
|
||||||
|
#else
|
||||||
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
|
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
|
||||||
@ -199,7 +230,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||||
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||||
#else
|
#else
|
||||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||||
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||||
@ -299,7 +330,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
|
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
@ -363,7 +394,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
|
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
@ -427,7 +458,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
|
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
|
||||||
@ -713,7 +744,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
AFN = aFN,
|
AFN = aFN,
|
||||||
Fn = fn,
|
Fn = fn,
|
||||||
ItemCode = tempItem,
|
ItemCode = tempItem,
|
||||||
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn,ammeter.MeteringCode),
|
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode),
|
||||||
ManualOrNot = false,
|
ManualOrNot = false,
|
||||||
Pn = ammeter.MeteringCode,
|
Pn = ammeter.MeteringCode,
|
||||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||||
@ -799,10 +830,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集水表数据
|
/// 水表数据采集
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task WatermeterScheduledMeterOneMinuteReading()
|
public virtual async Task WatermeterScheduledMeterAutoReading()
|
||||||
{
|
{
|
||||||
//获取缓存中的水表信息
|
//获取缓存中的水表信息
|
||||||
int timeDensity = 1;
|
int timeDensity = 1;
|
||||||
@ -810,15 +841,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
//获取下发任务缓存数据
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
|
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList,SystemType,ServerTagName ,timeDensity.ToString(), MeterTypeEnum.WaterMeter);
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -836,7 +867,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -856,211 +887,44 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
|
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
|
||||||
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集水表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public virtual async Task WatermeterScheduledMeterFiveMinuteReading()
|
|
||||||
{
|
|
||||||
|
|
||||||
//获取缓存中的电表信息
|
|
||||||
int timeDensity = 5;
|
|
||||||
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
|
|
||||||
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
|
||||||
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
|
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
|
||||||
foreach (var focusItem in meterTaskInfos)
|
|
||||||
{
|
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
|
||||||
{
|
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
|
||||||
{
|
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
|
||||||
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
|
||||||
{
|
|
||||||
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
|
|
||||||
}
|
|
||||||
|
|
||||||
////删除任务数据
|
|
||||||
//await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
|
|
||||||
|
|
||||||
////缓存下一个时间的任务
|
|
||||||
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
|
|
||||||
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集水表数据
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public virtual async Task WatermeterScheduledMeterFifteenMinuteReading()
|
|
||||||
{
|
|
||||||
//获取缓存中的电表信息
|
|
||||||
int timeDensity = 15;
|
|
||||||
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
|
|
||||||
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
|
||||||
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//获取下发任务缓存数据
|
|
||||||
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
|
|
||||||
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
|
|
||||||
|
|
||||||
//将取出的缓存任务数据发送到Kafka消息队列中
|
|
||||||
foreach (var focusItem in meterTaskInfos)
|
|
||||||
{
|
|
||||||
foreach (var ammerterItem in focusItem.Value)
|
|
||||||
{
|
|
||||||
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
|
|
||||||
{
|
|
||||||
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
|
|
||||||
MessageId = ammerterItem.Value.IssuedMessageId,
|
|
||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
|
||||||
TimeDensity = timeDensity.ToString(),
|
|
||||||
};
|
|
||||||
//await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
|
||||||
|
|
||||||
|
|
||||||
meterTaskInfosList.Add(ammerterItem.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
|
||||||
{
|
|
||||||
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
|
|
||||||
}
|
|
||||||
|
|
||||||
////删除任务数据
|
|
||||||
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
|
||||||
|
|
||||||
////缓存下一个时间的任务
|
|
||||||
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
|
|
||||||
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
|
||||||
}
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
#region 公共处理方法
|
#region 公共处理方法
|
||||||
/// <summary>
|
|
||||||
/// Lua脚本批量获取缓存的表计信息
|
|
||||||
/// </summary>
|
|
||||||
/// <typeparam name="T">表信息数据对象</typeparam>
|
|
||||||
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
|
|
||||||
/// <param name="timeDensity">采集频率,1分钟、5分钟、15分钟</param>
|
|
||||||
/// <param name="meterType">表计类型</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task<Dictionary<string, Dictionary<string, T>>> GetMeterRedisCacheData<T>(string[] redisKeys, string timeDensity, string meterType) where T : class
|
|
||||||
{
|
|
||||||
//通过lua脚本一次性获取所有缓存内容
|
|
||||||
var luaScript = @"
|
|
||||||
local results = {}
|
|
||||||
for i, key in ipairs(KEYS) do
|
|
||||||
local data = redis.call('HGETALL', key)
|
|
||||||
results[i] = {key, data}
|
|
||||||
end
|
|
||||||
return results";
|
|
||||||
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS
|
|
||||||
if (merterResult == null)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时没有获取到缓存信息,-102");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析结果(结果为嵌套数组)
|
|
||||||
var meterInfos = new Dictionary<string, Dictionary<string, T>>(); ;
|
|
||||||
if (merterResult is object[] arr)
|
|
||||||
{
|
|
||||||
foreach (object[] item in arr)
|
|
||||||
{
|
|
||||||
string key = (string)item[0];//集中器地址对应的Redis缓存Key
|
|
||||||
object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合
|
|
||||||
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meterType, timeDensity)}";
|
|
||||||
string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
|
|
||||||
|
|
||||||
var meterHashs = new Dictionary<string, T>();
|
|
||||||
for (int i = 0; i < fieldsAndValues.Length; i += 2)
|
|
||||||
{
|
|
||||||
string meterld = (string)fieldsAndValues[i];//表ID
|
|
||||||
string meterStr = (string)fieldsAndValues[i + 1];//表详情数据
|
|
||||||
|
|
||||||
T meterInfo = default!;
|
|
||||||
if (!string.IsNullOrWhiteSpace(meterStr))
|
|
||||||
{
|
|
||||||
meterInfo = meterStr.Deserialize<T>()!;
|
|
||||||
}
|
|
||||||
if (meterInfo != null)
|
|
||||||
{
|
|
||||||
meterHashs[meterld] = meterInfo;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_logger.LogInformation($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时集中器缓存{key}数据的{meterld}处理异常");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
meterInfos[focusAddress] = meterHashs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return meterInfos;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 指定时间对比当前时间
|
/// 判断是否需要生成采集指令
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="lastTime"></param>
|
/// <param name="nextTaskTime"></param>
|
||||||
/// <param name="subtrahend"></param>
|
/// <param name="timeDensity"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
|
private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
|
||||||
{
|
{
|
||||||
if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
|
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
|
||||||
return false;
|
{
|
||||||
return true;
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///// <summary>
|
||||||
|
///// 指定时间对比当前时间
|
||||||
|
///// </summary>
|
||||||
|
///// <param name="lastTime"></param>
|
||||||
|
///// <param name="subtrahend"></param>
|
||||||
|
///// <returns></returns>
|
||||||
|
//private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
|
||||||
|
//{
|
||||||
|
// if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
|
||||||
|
// return false;
|
||||||
|
// return true;
|
||||||
|
//}
|
||||||
|
|
||||||
///// <summary>
|
///// <summary>
|
||||||
///// 缓存下一个时间的任务
|
///// 缓存下一个时间的任务
|
||||||
///// </summary>
|
///// </summary>
|
||||||
@ -1091,6 +955,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*";
|
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*";
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -71,37 +71,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
|
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
|
||||||
{
|
{
|
||||||
|
|
||||||
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
|
//List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
|
||||||
ammeterInfos.Add(new AmmeterInfo()
|
//ammeterInfos.Add(new AmmeterInfo()
|
||||||
{
|
//{
|
||||||
Baudrate = 2400,
|
// Baudrate = 2400,
|
||||||
FocusAddress = "402440506",
|
// FocusAddress = "402440506",
|
||||||
Name = "张家祠工务(三相电表)",
|
// Name = "张家祠工务(三相电表)",
|
||||||
FocusID = 95780,
|
// FocusID = 95780,
|
||||||
DatabaseBusiID = 1,
|
// DatabaseBusiID = 1,
|
||||||
MeteringCode = 1,
|
// MeteringCode = 1,
|
||||||
AmmerterAddress = "402410040506",
|
// AmmerterAddress = "402410040506",
|
||||||
ID = 127035,
|
// ID = 127035,
|
||||||
TypeName = 3,
|
// TypeName = 3,
|
||||||
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
|
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
|
||||||
TimeDensity = 15,
|
// TimeDensity = 15,
|
||||||
});
|
//});
|
||||||
ammeterInfos.Add(new AmmeterInfo()
|
//ammeterInfos.Add(new AmmeterInfo()
|
||||||
{
|
//{
|
||||||
Baudrate = 2400,
|
// Baudrate = 2400,
|
||||||
FocusAddress = "542400504",
|
// FocusAddress = "542400504",
|
||||||
Name = "五号配(长芦二所四排)(单相电表)",
|
// Name = "五号配(长芦二所四排)(单相电表)",
|
||||||
FocusID = 69280,
|
// FocusID = 69280,
|
||||||
DatabaseBusiID = 1,
|
// DatabaseBusiID = 1,
|
||||||
MeteringCode = 2,
|
// MeteringCode = 2,
|
||||||
AmmerterAddress = "542410000504",
|
// AmmerterAddress = "542410000504",
|
||||||
ID = 95594,
|
// ID = 95594,
|
||||||
TypeName = 1,
|
// TypeName = 1,
|
||||||
DataTypes = "581,589,592,597,601",
|
// DataTypes = "581,589,592,597,601",
|
||||||
TimeDensity = 15,
|
// TimeDensity = 15,
|
||||||
});
|
//});
|
||||||
|
|
||||||
return ammeterInfos;
|
//return ammeterInfos;
|
||||||
|
|
||||||
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
|
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
|
||||||
FROM TB_GatherInfo(NOLOCK) AS A
|
FROM TB_GatherInfo(NOLOCK) AS A
|
||||||
@ -111,10 +111,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
WHERE 1=1 and C.Special = 0 ";
|
WHERE 1=1 and C.Special = 0 ";
|
||||||
//TODO 记得移除特殊表过滤
|
//TODO 记得移除特殊表过滤
|
||||||
|
|
||||||
if (!string.IsNullOrWhiteSpace(gatherCode))
|
//if (!string.IsNullOrWhiteSpace(gatherCode))
|
||||||
{
|
//{
|
||||||
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
|
// sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
|
||||||
}
|
//}
|
||||||
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
||||||
.Ado
|
.Ado
|
||||||
.QueryAsync<AmmeterInfo>(sql);
|
.QueryAsync<AmmeterInfo>(sql);
|
||||||
@ -187,6 +187,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
deviceList.Add($"Device_{Guid.NewGuid()}");
|
deviceList.Add($"Device_{Guid.NewGuid()}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 初始化缓存
|
||||||
|
DeviceGroupBalanceControl.InitializeCache(deviceList);
|
||||||
|
|
||||||
// 打印分布统计
|
// 打印分布统计
|
||||||
DeviceGroupBalanceControl.PrintDistributionStats();
|
DeviceGroupBalanceControl.PrintDistributionStats();
|
||||||
|
|||||||
@ -147,69 +147,16 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region 水表消息采集
|
#region 水表消息采集
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集水表数据下行消息消费订阅
|
/// 水表数据下行消息消费订阅
|
||||||
/// </summary>
|
|
||||||
/// <param name="receivedMessage"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
[HttpPost]
|
|
||||||
[Route("watermeter/oneminute/issued-event")]
|
|
||||||
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
|
|
||||||
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("1分钟采集水表数据下行消息消费队列开始处理");
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【1分钟采集水表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集水表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="receivedMessage"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
[HttpPost]
|
|
||||||
[Route("watermeter/fiveminute/issued-event")]
|
|
||||||
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
|
||||||
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("5分钟采集水表数据下行消息消费队列开始处理");
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【5分钟采集水表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集水表数据下行消息消费订阅
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("watermeter/fifteenminute/issued-event")]
|
[Route("watermeter/fifteenminute/issued-event")]
|
||||||
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
||||||
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
using JiShe.CollectBus.FreeRedisProvider;
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
@ -12,38 +13,98 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
/// 设备组负载控制
|
/// 设备组负载控制
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class DeviceGroupBalanceControl
|
public class DeviceGroupBalanceControl
|
||||||
{
|
{
|
||||||
/// <summary>
|
private static readonly object _syncRoot = new object();
|
||||||
/// 分组集合
|
|
||||||
/// </summary>
|
private static volatile CacheState _currentCache;
|
||||||
private static List<string>[] _cachedGroups;
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备分组关系映射
|
/// 使用ConcurrentDictionary保证线程安全的设备分组映射
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private static Dictionary<string, int> _balancedMapping;
|
private sealed class CacheState
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 初始化缓存并强制均衡
|
|
||||||
/// </summary>
|
|
||||||
public static void InitializeCache(List<string> deviceList,int groupCount = 50)
|
|
||||||
{
|
{
|
||||||
// 步骤1: 生成均衡映射表
|
public readonly ConcurrentDictionary<string, int> BalancedMapping;
|
||||||
_balancedMapping = CreateBalancedMapping(deviceList, groupCount);
|
public readonly List<string>[] CachedGroups;
|
||||||
|
|
||||||
// 步骤2: 根据映射表填充分组
|
public CacheState(int groupCount)
|
||||||
_cachedGroups = new List<string>[groupCount];
|
|
||||||
for (int i = 0; i < groupCount; i++)
|
|
||||||
{
|
{
|
||||||
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / groupCount + 1);
|
BalancedMapping = new ConcurrentDictionary<string, int>();
|
||||||
|
CachedGroups = new List<string>[groupCount];
|
||||||
|
for (int i = 0; i < groupCount; i++)
|
||||||
|
{
|
||||||
|
CachedGroups[i] = new List<string>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 初始化或增量更新缓存
|
||||||
|
/// </summary>
|
||||||
|
public static void InitializeCache(List<string> deviceList, int groupCount = 60)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
// 首次初始化
|
||||||
|
if (_currentCache == null)
|
||||||
|
{
|
||||||
|
var newCache = new CacheState(groupCount);
|
||||||
|
UpdateCacheWithDevices(newCache, deviceList, groupCount);
|
||||||
|
_currentCache = newCache;
|
||||||
|
}
|
||||||
|
// 后续增量更新
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (_currentCache.CachedGroups.Length != groupCount)
|
||||||
|
throw new ArgumentException("Group count cannot change after initial initialization");
|
||||||
|
|
||||||
|
var clonedCache = CloneExistingCache();
|
||||||
|
UpdateCacheWithDevices(clonedCache, deviceList, groupCount);
|
||||||
|
_currentCache = clonedCache;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 带锁的缓存克隆(写入时复制)
|
||||||
|
/// </summary>
|
||||||
|
private static CacheState CloneExistingCache()
|
||||||
|
{
|
||||||
|
var oldCache = _currentCache;
|
||||||
|
var newCache = new CacheState(oldCache.CachedGroups.Length);
|
||||||
|
|
||||||
|
// 复制已有映射
|
||||||
|
foreach (var kvp in oldCache.BalancedMapping)
|
||||||
|
{
|
||||||
|
newCache.BalancedMapping.TryAdd(kvp.Key, kvp.Value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 复制分组数据
|
||||||
|
for (int i = 0; i < oldCache.CachedGroups.Length; i++)
|
||||||
|
{
|
||||||
|
newCache.CachedGroups[i].AddRange(oldCache.CachedGroups[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return newCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 更新设备到缓存
|
||||||
|
/// </summary>
|
||||||
|
private static void UpdateCacheWithDevices(CacheState cache, List<string> deviceList, int groupCount)
|
||||||
|
{
|
||||||
foreach (var deviceId in deviceList)
|
foreach (var deviceId in deviceList)
|
||||||
{
|
{
|
||||||
int groupId = _balancedMapping[deviceId];
|
// 原子操作:如果设备不存在则计算分组
|
||||||
_cachedGroups[groupId].Add(deviceId);
|
cache.BalancedMapping.GetOrAdd(deviceId, id =>
|
||||||
}
|
{
|
||||||
|
int groupId = GetGroupId(id, groupCount);
|
||||||
|
lock (cache.CachedGroups[groupId])
|
||||||
|
{
|
||||||
|
cache.CachedGroups[groupId].Add(id);
|
||||||
|
}
|
||||||
|
return groupId;
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -51,11 +112,11 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public static List<string> GetGroup(string deviceId)
|
public static List<string> GetGroup(string deviceId)
|
||||||
{
|
{
|
||||||
if (_balancedMapping == null || _cachedGroups == null)
|
var cache = _currentCache;
|
||||||
|
if (cache == null)
|
||||||
throw new InvalidOperationException("缓存未初始化");
|
throw new InvalidOperationException("缓存未初始化");
|
||||||
|
|
||||||
int groupId = _balancedMapping[deviceId];
|
return cache.CachedGroups[cache.BalancedMapping[deviceId]];
|
||||||
return _cachedGroups[groupId];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -63,10 +124,11 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public static int GetDeviceGroupId(string deviceId)
|
public static int GetDeviceGroupId(string deviceId)
|
||||||
{
|
{
|
||||||
if (_balancedMapping == null || _cachedGroups == null)
|
var cache = _currentCache;
|
||||||
|
if (cache == null)
|
||||||
throw new InvalidOperationException("缓存未初始化");
|
throw new InvalidOperationException("缓存未初始化");
|
||||||
|
|
||||||
return _balancedMapping[deviceId];
|
return cache.BalancedMapping[deviceId];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -162,7 +224,14 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public static void PrintDistributionStats()
|
public static void PrintDistributionStats()
|
||||||
{
|
{
|
||||||
var stats = _cachedGroups
|
var cache = _currentCache;
|
||||||
|
if (cache == null)
|
||||||
|
{
|
||||||
|
Console.WriteLine("缓存未初始化");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var stats = cache.CachedGroups
|
||||||
.Select((group, idx) => new { GroupId = idx, Count = group.Count })
|
.Select((group, idx) => new { GroupId = idx, Count = group.Count })
|
||||||
.OrderBy(x => x.GroupId);
|
.OrderBy(x => x.GroupId);
|
||||||
|
|
||||||
|
|||||||
@ -69,6 +69,30 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
|
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断Kafka主题是否存在
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic">主题名称</param>
|
||||||
|
/// <param name="numPartitions">副本数量,不能高于Brokers数量</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public async Task<bool> CheckTopicAsync(string topic,int numPartitions)
|
||||||
|
{
|
||||||
|
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
|
||||||
|
if(numPartitions > metadata.Brokers.Count)
|
||||||
|
{
|
||||||
|
throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ;
|
||||||
|
}
|
||||||
|
|
||||||
|
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
|
||||||
|
}
|
||||||
|
|
||||||
|
//// <summary>
|
||||||
|
/// 创建Kafka主题
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic">主题名称</param>
|
||||||
|
/// <param name="numPartitions">主题分区数量</param>
|
||||||
|
/// <param name="replicationFactor">副本数量,不能高于Brokers数量</param>
|
||||||
|
/// <returns></returns>
|
||||||
public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor)
|
public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor)
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -96,17 +120,31 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 删除Kafka主题
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public async Task DeleteTopicAsync(string topic)
|
public async Task DeleteTopicAsync(string topic)
|
||||||
{
|
{
|
||||||
await Instance.DeleteTopicsAsync(new[] { topic });
|
await Instance.DeleteTopicsAsync(new[] { topic });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取Kafka主题列表
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
public async Task<List<string>> ListTopicsAsync()
|
public async Task<List<string>> ListTopicsAsync()
|
||||||
{
|
{
|
||||||
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
||||||
return new List<string>(metadata.Topics.Select(t => t.Topic));
|
return new List<string>(metadata.Topics.Select(t => t.Topic));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断Kafka主题是否存在
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public async Task<bool> TopicExistsAsync(string topic)
|
public async Task<bool> TopicExistsAsync(string topic)
|
||||||
{
|
{
|
||||||
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
|
||||||
|
|||||||
@ -8,9 +8,33 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
{
|
{
|
||||||
public interface IAdminClientService
|
public interface IAdminClientService
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 创建Kafka主题
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic">主题名称</param>
|
||||||
|
/// <param name="numPartitions">主题分区数量</param>
|
||||||
|
/// <param name="replicationFactor">副本数量,不能高于Brokers数量</param>
|
||||||
|
/// <returns></returns>
|
||||||
Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor);
|
Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 删除Kafka主题
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <returns></returns>
|
||||||
Task DeleteTopicAsync(string topic);
|
Task DeleteTopicAsync(string topic);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取Kafka主题列表
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
Task<List<string>> ListTopicsAsync();
|
Task<List<string>> ListTopicsAsync();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断Kafka主题是否存在
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <returns></returns>
|
||||||
Task<bool> TopicExistsAsync(string topic);
|
Task<bool> TopicExistsAsync(string topic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,7 +49,6 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
|
|
||||||
//动态上报主题,需根据协议的AFN功能码动态获取
|
//动态上报主题,需根据协议的AFN功能码动态获取
|
||||||
var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
|
var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
|
||||||
|
|
||||||
//需要排除的AFN功能码
|
//需要排除的AFN功能码
|
||||||
var excludeItems = new List<int>() { 6, 7, 8,15 };
|
var excludeItems = new List<int>() { 6, 7, 8,15 };
|
||||||
|
|
||||||
|
|||||||
@ -70,22 +70,9 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
|
|
||||||
#region 水表消息主题
|
#region 水表消息主题
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 1分钟采集水表数据下行消息主题
|
/// 水表数据下行消息主题,由于水表采集频率不高,所以一个主题就够
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.auto.one.watermeter.event";
|
public const string WatermeterSubscriberWorkerAutoReadingIssuedEventName = "issued.auto.reading.watermeter.event";
|
||||||
/// <summary>
|
|
||||||
/// 5分钟采集水表数据下行消息主题
|
|
||||||
/// </summary>
|
|
||||||
public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.auto.five.watermeter.event";
|
|
||||||
/// <summary>
|
|
||||||
/// 15分钟采集水表数据下行消息主题
|
|
||||||
/// </summary>
|
|
||||||
public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.auto.fifteen.watermeter.event";
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号等
|
|
||||||
/// </summary>
|
|
||||||
public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.auto.other.watermeter.event";
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 水表自动阀控
|
/// 水表自动阀控
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user