dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
40 changed files with 2378 additions and 665 deletions
Showing only changes of commit e9cd38bd64 - Show all commits

1
.gitignore vendored
View File

@ -401,3 +401,4 @@ FodyWeavers.xsd
# ABP Studio
**/.abpstudio/
/src/JiShe.CollectBus.Host/Plugins/*.dll
JiShe.CollectBus.Kafka.Test

View File

@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{82E4562A-3A7F-4372-8D42-8AE41BA56C04}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -113,6 +115,10 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -135,6 +141,7 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{82E4562A-3A7F-4372-8D42-8AE41BA56C04} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -77,22 +77,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Task InitWatermeterCacheData(string gatherCode = "");
/// <summary>
/// 1分钟采集水表数据
/// 水表数据采集
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterOneMinuteReading();
Task WatermeterScheduledMeterAutoReading();
/// <summary>
/// 5分钟采集水表数据只获取任务数据下发不构建任务
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFiveMinuteReading();
/// <summary>
/// 15分钟采集水表数据只获取任务数据下发不构建任务
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFifteenMinuteReading();
#endregion

View File

@ -39,19 +39,8 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 5分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 15分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
}
}

View File

@ -1,10 +1,14 @@
using FreeRedis;
using FreeSql;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeRedisProvider;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Localization;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Application.Services;
@ -16,9 +20,205 @@ public abstract class CollectBusAppService : ApplicationService
public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService<IFreeSqlProvider>();
protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService<IFreeRedisProvider>()!;
protected CollectBusAppService()
{
LocalizationResource = typeof(CollectBusResource);
ObjectMapperContext = typeof(CollectBusApplicationModule);
}
/// <summary>
/// Lua脚本批量获取缓存的表计信息
/// </summary>
/// <typeparam name="T">表信息数据对象</typeparam>
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
/// <param name="systemType"><see cref="SystemTypeConst"/> 系统类型</param>
/// <param name="serverTagName">服务器标识</param>
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param>
/// <returns></returns>
protected async Task<Dictionary<string, Dictionary<string, T>>> GetMeterRedisCacheDictionaryData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
{
if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity))
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101");
}
var meterInfos = new Dictionary<string, Dictionary<string, T>>();
var luaScript = @"
local results = {}
for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key)
results[i] = {key, data}
end
return results";
// 分页参数每页处理10000个键
int pageSize = 10000;
int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
for (int page = 0; page < totalPages; page++)
{
// 分页获取当前批次的键
var batchKeys = redisKeys
.Skip(page * pageSize)
.Take(pageSize)
.ToArray();
// 执行Lua脚本获取当前批次数据
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
if (merterResult == null)
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102");
}
// 解析当前批次的结果
if (merterResult is object[] arr)
{
foreach (object[] item in arr)
{
string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, "");
var meterHashs = new Dictionary<string, T>();
for (int i = 0; i < fieldsAndValues.Length; i += 2)
{
string meterId = (string)fieldsAndValues[i];
string meterStr = (string)fieldsAndValues[i + 1];
T meterInfo = default!;
if (!string.IsNullOrWhiteSpace(meterStr))
{
meterInfo = meterStr.Deserialize<T>()!;
}
if (meterInfo != null)
{
meterHashs[meterId] = meterInfo;
}
else
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103");
}
}
// 合并到总结果若存在重复key则覆盖
if (meterInfos.ContainsKey(focusAddress))
{
foreach (var kvp in meterHashs)
{
meterInfos[focusAddress][kvp.Key] = kvp.Value;
}
}
else
{
meterInfos[focusAddress] = meterHashs;
}
}
}
else
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104");
}
}
return meterInfos;
}
/// <summary>
/// Lua脚本批量获取缓存的表计信息
/// </summary>
/// <typeparam name="T">表信息数据对象</typeparam>
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
/// <param name="systemType"><see cref="SystemTypeConst"/> 系统类型</param>
/// <param name="serverTagName">服务器标识</param>
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param>
/// <returns></returns>
protected async Task<List<T>> GetMeterRedisCacheListData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
{
if (redisKeys == null || redisKeys.Length <= 0 ||
string.IsNullOrWhiteSpace(systemType) ||
string.IsNullOrWhiteSpace(serverTagName) ||
string.IsNullOrWhiteSpace(timeDensity))
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101");
}
var meterInfos = new List<T>();
var luaScript = @"
local results = {}
for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key)
results[i] = {key, data}
end
return results";
// 分页参数每页10000个键
int pageSize = 10000;
int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
for (int page = 0; page < totalPages; page++)
{
// 分页获取当前批次键
var batchKeys = redisKeys
.Skip(page * pageSize)
.Take(pageSize)
.ToArray();
// 执行Lua脚本获取当前页数据
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
if (merterResult == null)
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102");
}
// 解析当前页结果
if (merterResult is object[] arr)
{
foreach (object[] item in arr)
{
string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = string.Format(
RedisConst.CacheMeterInfoKey,
systemType,
serverTagName,
meterType,
timeDensity
);
string focusAddress = key.Replace(redisCacheKey, "");
for (int i = 0; i < fieldsAndValues.Length; i += 2)
{
string meterId = (string)fieldsAndValues[i];
string meterStr = (string)fieldsAndValues[i + 1];
T meterInfo = default!;
if (!string.IsNullOrWhiteSpace(meterStr))
{
meterInfo = meterStr.Deserialize<T>()!;
}
if (meterInfo != null)
{
meterInfos.Add(meterInfo);
}
else
{
throw new Exception(
$"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}-103"
);
}
}
}
}
else
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104");
}
}
return meterInfos;
}
}

View File

@ -64,7 +64,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
await dbContext.InitAmmeterCacheData();
await dbContext.InitWatermeterCacheData();
//await dbContext.InitWatermeterCacheData();
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();

View File

@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
@ -141,7 +142,15 @@ namespace JiShe.CollectBus.Plugins
await client.ResetIdAsync(deviceNo);
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
var deviceInfoList= await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
@ -171,7 +180,15 @@ namespace JiShe.CollectBus.Plugins
string clientId = deviceNo;
string oldClientId = $"{client.Id}";
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);

View File

@ -13,14 +13,20 @@ using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using JiShe.CollectBus.IoTDBProvider.Context;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.AFNEntity;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using System.Diagnostics.Metrics;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Kafka.Attributes;
using System.Text.Json;
using JiShe.CollectBus.Kafka;
namespace JiShe.CollectBus.Samples;
public class SampleAppService : CollectBusAppService, ISampleAppService
public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe
{
private readonly ILogger<SampleAppService> _logger;
private readonly IIoTDBProvider _iotDBProvider;
@ -108,14 +114,28 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
[HttpGet]
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
{
var deviceList = new List<string>();
for (int i = 0; i < deviceCount; i++)
//var deviceList = new List<string>();
//for (int i = 0; i < deviceCount; i++)
//{
// deviceList.Add($"Device_{Guid.NewGuid()}");
//}
//// 初始化缓存
//DeviceGroupBalanceControl.InitializeCache(deviceList);
var timeDensity = "15";
//获取缓存中的电表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
List<string> focusAddressDataLista = new List<string>();
foreach (var item in meterInfos)
{
deviceList.Add($"Device_{Guid.NewGuid()}");
focusAddressDataLista.Add(item.FocusAddress);
}
// 初始化缓存
DeviceGroupBalanceControl.InitializeCache(deviceList);
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats();
@ -123,6 +143,20 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
await Task.CompletedTask;
}
/// <summary>
/// 测试设备分组均衡控制算法获取分组Id
/// </summary>
/// <param name="deviceAddress"></param>
/// <returns></returns>
[HttpGet]
public async Task TestGetDeviceGroupBalanceControl(string deviceAddress)
{
var groupId = DeviceGroupBalanceControl.GetDeviceGroupId(deviceAddress);
Console.WriteLine(groupId);
await Task.CompletedTask;
}
/// <summary>
/// 测试单个测点数据项
@ -180,4 +214,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
var aa = LazyServiceProvider.GetKeyedService<IProtocolPlugin>("TestProtocolPlugin");
return aa == null;
}
[KafkaSubscribe(["test-topic"])]
public async Task<bool> KafkaSubscribeAsync(string obj)
{
_logger.LogWarning($"收到订阅消息: {obj}");
return await Task.FromResult(true);
}
}

View File

@ -2,6 +2,7 @@
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
@ -10,9 +11,9 @@ using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Serializer;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
@ -31,17 +32,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly ICapPublisher _producerBus;
private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService,
IIoTDBProvider dbProvider)
{
_producerBus = producerBus;
_logger = logger;
_dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository;
_producerService = producerService;
}
/// <summary>
@ -96,20 +101,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
if (!IsGennerateCmd(tasksToBeIssueModel.NextTaskTime))
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
if (timeDensity > 15)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
timeDensity = 15;
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
}
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率
var tempArryay = item.Split(":");
string meteryType = tempArryay[3];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率
//获取缓存中的电表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*";
//获取缓存中的表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
@ -117,16 +128,45 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return;
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
// 解析结果(结果为嵌套数组)
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, $"{timeDensity}", meteryType);
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
if (meterInfos == null || meterInfos.Count <= 0)
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
var timer = Stopwatch.StartNew();
//处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
// items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, threadId) =>
// {
// _ = AmmerterCreatePublishTask(timeDensity, data);
// }
//);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: data =>
{
_ = AmmerterCreatePublishTask(timeDensity, data);
}
);
timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}");
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
@ -140,6 +180,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
@ -165,7 +207,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
#if DEBUG
var timeDensity = "15";
//获取缓存中的电表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
List<string> focusAddressDataLista = new List<string>();
foreach (var item in meterInfos)
{
focusAddressDataLista.Add(item.FocusAddress);
}
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
@ -199,7 +260,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
@ -299,7 +360,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
@ -363,7 +424,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
@ -427,7 +488,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
@ -461,11 +522,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
stopwatch.Stop();
@ -532,7 +588,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据分组创建线程批处理集中器
foreach (var group in focusHashGroups)
{
await AmmerterCreatePublishTask(timeDensity, group.Value);
await AmmerterCreatePublishTask2(timeDensity, group.Value);
}
}
catch (Exception)
@ -542,13 +598,274 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
/// <summary>
/// 电表创建发布任务
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity
, AmmeterInfo ammeterInfo)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return;
}
if (ammeterInfo.State.Equals(2))
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
return;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
return;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
return;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return;
}
else
{
tempCodes = tempSubCodes;
}
}
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.)
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn);
}
else
{
string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
dataInfos = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode
});
}
else
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
continue;
}
}
//TODO:特殊表
if (dataInfos == null || dataInfos.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
var meterReadingRecords = new MeterReadingRecords()
{
ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
MeterId = ammeterInfo.ID,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
FocusID = ammeterInfo.FocusID,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode),
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords);
}
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan);
//return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
using (var pipe = FreeRedisProvider.Instance.StartPipe())
{
pipe.HSet(redisCacheKey, keyValuePairs);
object[] ret = pipe.EndPipe();
}
await Task.CompletedTask;
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessage(string topicName,
MeterReadingRecords taskRecord)
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
}
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
await _producerService.ProduceAsync(topicName, partition, taskRecord);
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
{
var currentDateTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType);
//FreeRedisProvider.Instance.key()
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
}
/// <summary>
/// 电表创建发布任务
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity
private async Task AmmerterCreatePublishTask2(int timeDensity
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
@ -713,7 +1030,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn,ammeter.MeteringCode),
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode),
ManualOrNot = false,
Pn = ammeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
@ -799,10 +1116,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
/// <summary>
/// 1分钟采集水表数据
/// 水表数据采集
/// </summary>
/// <returns></returns>
public virtual async Task WatermeterScheduledMeterOneMinuteReading()
public virtual async Task WatermeterScheduledMeterAutoReading()
{
//获取缓存中的水表信息
int timeDensity = 1;
@ -810,15 +1127,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
@ -836,7 +1153,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@ -856,211 +1173,44 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
}
/// <summary>
/// 5分钟采集水表数据
/// </summary>
/// <returns></returns>
public virtual async Task WatermeterScheduledMeterFiveMinuteReading()
{
//获取缓存中的电表信息
int timeDensity = 5;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
}
/// <summary>
/// 15分钟采集水表数据
/// </summary>
/// <returns></returns>
public virtual async Task WatermeterScheduledMeterFifteenMinuteReading()
{
//获取缓存中的电表信息
int timeDensity = 15;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheData<MeterReadingRecords>(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
}
#endregion
#region
/// <summary>
/// Lua脚本批量获取缓存的表计信息
/// </summary>
/// <typeparam name="T">表信息数据对象</typeparam>
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="meterType">表计类型</param>
/// <returns></returns>
private async Task<Dictionary<string, Dictionary<string, T>>> GetMeterRedisCacheData<T>(string[] redisKeys, string timeDensity, string meterType) where T : class
{
//通过lua脚本一次性获取所有缓存内容
var luaScript = @"
local results = {}
for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key)
results[i] = {key, data}
end
return results";
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS
if (merterResult == null)
{
_logger.LogError($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时没有获取到缓存信息,-102");
return null;
}
// 解析结果(结果为嵌套数组)
var meterInfos = new Dictionary<string, Dictionary<string, T>>(); ;
if (merterResult is object[] arr)
{
foreach (object[] item in arr)
{
string key = (string)item[0];//集中器地址对应的Redis缓存Key
object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
var meterHashs = new Dictionary<string, T>();
for (int i = 0; i < fieldsAndValues.Length; i += 2)
{
string meterld = (string)fieldsAndValues[i];//表ID
string meterStr = (string)fieldsAndValues[i + 1];//表详情数据
T meterInfo = default!;
if (!string.IsNullOrWhiteSpace(meterStr))
{
meterInfo = meterStr.Deserialize<T>()!;
}
if (meterInfo != null)
{
meterHashs[meterld] = meterInfo;
}
else
{
_logger.LogInformation($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时集中器缓存{key}数据的{meterld}处理异常");
}
}
meterInfos[focusAddress] = meterHashs;
}
}
return meterInfos;
}
/// <summary>
/// 指定时间对比当前时间
/// 判断是否需要生成采集指令
/// </summary>
/// <param name="lastTime"></param>
/// <param name="subtrahend"></param>
/// <param name="nextTaskTime"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
{
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
{
if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
return false;
return true;
}
return false;
}
///// <summary>
///// 指定时间对比当前时间
///// </summary>
///// <param name="lastTime"></param>
///// <param name="subtrahend"></param>
///// <returns></returns>
//private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
//{
// if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
// return false;
// return true;
//}
///// <summary>
///// 缓存下一个时间的任务
///// </summary>
@ -1091,6 +1241,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*";
}
#endregion
}

View File

@ -1,9 +1,11 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem;
@ -12,6 +14,7 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using MassTransit;
@ -33,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration) : base(logger, producerBus, meterReadingRecordRepository, dbProvider)
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
{
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
}
@ -71,37 +74,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
ammeterInfos.Add(new AmmeterInfo()
{
Baudrate = 2400,
FocusAddress = "402440506",
Name = "张家祠工务(三相电表)",
FocusID = 95780,
DatabaseBusiID = 1,
MeteringCode = 1,
AmmerterAddress = "402410040506",
ID = 127035,
TypeName = 3,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
});
ammeterInfos.Add(new AmmeterInfo()
{
Baudrate = 2400,
FocusAddress = "542400504",
Name = "五号配(长芦二所四排)(单相电表)",
FocusID = 69280,
DatabaseBusiID = 1,
MeteringCode = 2,
AmmerterAddress = "542410000504",
ID = 95594,
TypeName = 1,
DataTypes = "581,589,592,597,601",
TimeDensity = 15,
});
//List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
//ammeterInfos.Add(new AmmeterInfo()
//{
// Baudrate = 2400,
// FocusAddress = "402440506",
// Name = "张家祠工务(三相电表)",
// FocusID = 95780,
// DatabaseBusiID = 1,
// MeteringCode = 1,
// AmmerterAddress = "402410040506",
// ID = 127035,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
//});
//ammeterInfos.Add(new AmmeterInfo()
//{
// Baudrate = 2400,
// FocusAddress = "542400504",
// Name = "五号配(长芦二所四排)(单相电表)",
// FocusID = 69280,
// DatabaseBusiID = 1,
// MeteringCode = 2,
// AmmerterAddress = "542410000504",
// ID = 95594,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
//});
return ammeterInfos;
//return ammeterInfos;
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
@ -111,10 +114,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
}
//if (!string.IsNullOrWhiteSpace(gatherCode))
//{
// sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
//}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<AmmeterInfo>(sql);
@ -187,6 +190,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
deviceList.Add($"Device_{Guid.NewGuid()}");
}
// 初始化缓存
DeviceGroupBalanceControl.InitializeCache(deviceList);
// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats();

View File

@ -1,5 +1,6 @@
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices;
@ -9,7 +10,6 @@ using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Serializer;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;

View File

@ -147,69 +147,16 @@ namespace JiShe.CollectBus.Subscribers
#endregion
#region
/// <summary>
/// 1分钟采集水表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【1分钟采集水表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
}
/// <summary>
/// 5分钟采集水表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【5分钟采集水表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
}
/// <summary>
/// 15分钟采集水表数据下行消息消费订阅
/// 水表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");

View File

@ -34,7 +34,7 @@ namespace JiShe.CollectBus.Workers
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
{
await _scheduledMeterReadingService.CreateToBeIssueTasks();
// await _scheduledMeterReadingService.CreateToBeIssueTasks();
}
}
}

View File

@ -0,0 +1,426 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
/// <summary>
/// 设备组负载控制
/// </summary>
public class DeviceGroupBalanceControl
{
private static readonly object _syncRoot = new object();
private static volatile CacheState _currentCache;
/// <summary>
/// 使用ConcurrentDictionary保证线程安全的设备分组映射
/// </summary>
private sealed class CacheState
{
public readonly ConcurrentDictionary<string, int> BalancedMapping;
public readonly List<string>[] CachedGroups;
public CacheState(int groupCount)
{
BalancedMapping = new ConcurrentDictionary<string, int>();
CachedGroups = new List<string>[groupCount];
for (int i = 0; i < groupCount; i++)
{
CachedGroups[i] = new List<string>();
}
}
}
/// <summary>
/// 初始化或增量更新缓存
/// </summary>
public static void InitializeCache(List<string> deviceList, int groupCount = 30)
{
if (deviceList == null || deviceList.Count <= 0)
{
throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化失败,设备数据为空");
}
if (groupCount > 60 || groupCount <= 0)
{
groupCount = 60;
}
lock (_syncRoot)
{
// 首次初始化
if (_currentCache == null)
{
var newCache = new CacheState(groupCount);
UpdateCacheWithDevices(newCache, deviceList, groupCount);
_currentCache = newCache;
}
// 后续增量更新
else
{
if (_currentCache.CachedGroups.Length != groupCount)
{
throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化完成以后,分组数量不能更改");
}
var clonedCache = CloneExistingCache();
UpdateCacheWithDevices(clonedCache, deviceList, groupCount);
_currentCache = clonedCache;
}
}
}
/// <summary>
/// 带锁的缓存克隆(写入时复制)
/// </summary>
private static CacheState CloneExistingCache()
{
var oldCache = _currentCache;
var newCache = new CacheState(oldCache.CachedGroups.Length);
// 复制已有映射
foreach (var kvp in oldCache.BalancedMapping)
{
newCache.BalancedMapping.TryAdd(kvp.Key, kvp.Value);
}
// 复制分组数据
for (int i = 0; i < oldCache.CachedGroups.Length; i++)
{
newCache.CachedGroups[i].AddRange(oldCache.CachedGroups[i]);
}
return newCache;
}
/// <summary>
/// 更新设备到缓存
/// </summary>
private static void UpdateCacheWithDevices(CacheState cache, List<string> deviceList, int groupCount)
{
foreach (var deviceId in deviceList)
{
// 原子操作:如果设备不存在则计算分组
cache.BalancedMapping.GetOrAdd(deviceId, id =>
{
int groupId = GetGroupId(id, groupCount);
lock (cache.CachedGroups[groupId])
{
cache.CachedGroups[groupId].Add(id);
}
return groupId;
});
}
}
/// <summary>
/// 并行处理泛型数据集(支持动态线程分配)
/// </summary>
/// <typeparam name="T">已经分组的设备信息</typeparam>
/// <param name="items">部分或者全部的已经分组的设备集合</param>
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
/// <param name="processor">处理委托参数当前对象线程ID</param>
/// <param name="maxThreads">可选线程限制</param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessGenericListAsync<T>(
List<T> items, Func<T, string> deviceIdSelector, Action<T, int> processor, int? maxThreads = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
// 创建分组任务队列
var groupQueues = new ConcurrentQueue<T>[cache.CachedGroups.Length];
for (int i = 0; i < groupQueues.Length; i++)
{
groupQueues[i] = new ConcurrentQueue<T>();
}
// 阶段1分发数据到分组队列
Parallel.ForEach(items, item =>
{
var deviceId = deviceIdSelector(item);
if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId))
{
groupQueues[groupId].Enqueue(item);
}
});
if ((maxThreads.HasValue && maxThreads.Value > cache.CachedGroups.Length) || maxThreads.HasValue == false)
{
maxThreads = cache.CachedGroups.Length;
}
// 阶段2并行处理队列
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxThreads.Value,
};
TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
await Task.Run(() =>
{
Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
{
var queue = groupQueues[groupId];
while (queue.TryDequeue(out T item))
{
await Task.Delay(timeSpan);
processor(item, Thread.CurrentThread.ManagedThreadId);
}
});
});
}
/// <summary>
/// 智能节流处理CPU友好型
/// </summary>
/// <typeparam name="T">已经分组的设备信息</typeparam>
/// <param name="items">部分或者全部的已经分组的设备集合</param>
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
/// <param name="processor">处理委托参数当前对象线程ID</param>
/// <param name="maxConcurrency">可选最佳并发度</param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessWithThrottleAsync<T>(
List<T> items,
Func<T, string> deviceIdSelector,
Action<T> processor,
int? maxConcurrency = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
// 自动计算最佳并发度
int recommendedThreads = CalculateOptimalThreadCount();
if ((maxConcurrency.HasValue && maxConcurrency.Value > cache.CachedGroups.Length) || maxConcurrency.HasValue == false)
{
maxConcurrency = cache.CachedGroups.Length;
}
int actualThreads = maxConcurrency ?? recommendedThreads;
// 创建节流器
using var throttler = new SemaphoreSlim(initialCount: actualThreads);
// 使用LongRunning避免线程池饥饿
var tasks = items.Select(async item =>
{
await throttler.WaitAsync();
try
{
var deviceId = deviceIdSelector(item);
if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId))
{
// 分组级处理(保持顺序性)
await ProcessItemAsync(item, processor, groupId);
}
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
/// <summary>
/// 自动计算最优线程数
/// </summary>
private static int CalculateOptimalThreadCount()
{
int coreCount = Environment.ProcessorCount;
return Math.Min(
coreCount * 2, // 超线程优化
_currentCache?.CachedGroups.Length ?? 60
);
}
/// <summary>
/// 分组异步处理(带节流)
/// </summary>
private static async Task ProcessItemAsync<T>(T item, Action<T> processor, int groupId)
{
// 使用内存缓存降低CPU负载
await Task.Yield(); // 立即释放当前线程
// 分组处理上下文
var context = ExecutionContext.Capture();
ThreadPool.QueueUserWorkItem(_ =>
{
ExecutionContext.Run(context!, state =>
{
processor(item);
}, null);
});
}
/// <summary>
/// 通过 deviceId 获取所在的分组集合
/// </summary>
public static List<string> GetGroup(string deviceId)
{
var cache = _currentCache;
if (cache == null)
throw new InvalidOperationException("缓存未初始化");
return cache.CachedGroups[cache.BalancedMapping[deviceId]];
}
/// <summary>
/// 通过 deviceId 获取分组Id
/// </summary>
public static int GetDeviceGroupId(string deviceId)
{
var cache = _currentCache;
if (cache == null)
throw new InvalidOperationException("缓存未初始化");
return cache.BalancedMapping[deviceId];
}
/// <summary>
/// 创建均衡映射表
/// </summary>
/// <param name="deviceList">数据集合</param>
/// <param name="groupCount">分组数量</param>
/// <param name="maxDeviation">允许的最大偏差百分比</param>
/// <returns></returns>
public static Dictionary<string, int> CreateBalancedMapping(List<string> deviceList, int groupCount, int maxDeviation = 5)
{
var mapping = new Dictionary<string, int>();
int targetPerGroup = deviceList.Count / groupCount;
int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0));
// 初始化分组计数器
int[] groupCounters = new int[groupCount];
foreach (var deviceId in deviceList)
{
int preferredGroup = GetGroupId(deviceId, groupCount);
// 如果首选分组未满,直接分配
if (groupCounters[preferredGroup] < maxAllowed)
{
mapping[deviceId] = preferredGroup;
groupCounters[preferredGroup]++;
}
else
{
// 寻找当前最空闲的分组
int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min());
mapping[deviceId] = fallbackGroup;
groupCounters[fallbackGroup]++;
}
}
return mapping;
}
/// <summary>
/// 分析分组分布
/// </summary>
/// <param name="deviceList"></param>
/// <param name="groupCount"></param>
/// <returns></returns>
public static Dictionary<int, int> AnalyzeDistribution(List<string> deviceList, int groupCount)
{
Dictionary<int, int> distribution = new Dictionary<int, int>();
foreach (var deviceId in deviceList)
{
int groupId = GetGroupId(deviceId, groupCount);
distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1;
}
return distribution;
}
/// <summary>
/// 获取设备ID对应的分组ID
/// </summary>
/// <param name="deviceId"></param>
/// <param name="groupCount"></param>
/// <returns></returns>
public static int GetGroupId(string deviceId, int groupCount)
{
int hash = Fnv1aHash(deviceId);
// 双重取模确保分布均匀
return (hash % groupCount + groupCount) % groupCount;
}
/// <summary>
/// FNV-1a哈希算法
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static int Fnv1aHash(string input)
{
const uint fnvPrime = 16777619;
const uint fnvOffsetBasis = 2166136261;
uint hash = fnvOffsetBasis;
foreach (char c in input)
{
hash ^= (byte)c;
hash *= fnvPrime;
}
return (int)hash;
}
/// <summary>
/// CRC16算法实现
/// </summary>
/// <param name="bytes"></param>
/// <returns></returns>
public static ushort CRC16Hash(byte[] bytes)
{
ushort crc = 0xFFFF;
for (int i = 0; i < bytes.Length; i++)
{
crc ^= bytes[i];
for (int j = 0; j < 8; j++)
{
if ((crc & 0x0001) == 1)
{
crc = (ushort)((crc >> 1) ^ 0xA001);
}
else
{
crc >>= 1;
}
}
}
return crc;
}
/// <summary>
/// 打印分组统计数据
/// </summary>
public static void PrintDistributionStats()
{
var cache = _currentCache;
if (cache == null)
{
Console.WriteLine("缓存未初始化");
return;
}
var stats = cache.CachedGroups
.Select((group, idx) => new { GroupId = idx, Count = group.Count })
.OrderBy(x => x.GroupId);
Console.WriteLine("分组数据量统计:");
foreach (var stat in stats)
{
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
}
}
}
}

View File

@ -181,5 +181,24 @@ namespace JiShe.CollectBus.Common.Extensions
return $"{dateTime:yyyyMMddHH}";
#endif
}
/// <summary>
/// 获取当前时间毫秒级时间戳
/// </summary>
/// <returns></returns>
public static long GetCurrentTimeMillis()
{
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
/// <summary>
/// 将Unix时间戳转换为日期时间
/// </summary>
/// <param name="millis"></param>
/// <returns></returns>
public static DateTime FromUnixMillis(long millis)
{
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
}
}
}

View File

@ -64,5 +64,30 @@ namespace JiShe.CollectBus.Common.Extensions
? source.Where(predicate)
: source;
}
/// <summary>
/// 分批
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source"></param>
/// <param name="batchSize"></param>
/// <returns></returns>
public static IEnumerable<IEnumerable<T>> Batch<T>(
this IEnumerable<T> source,
int batchSize)
{
var buffer = new List<T>(batchSize);
foreach (var item in source)
{
buffer.Add(item);
if (buffer.Count == batchSize)
{
yield return buffer;
buffer = new List<T>(batchSize);
}
}
if (buffer.Count > 0)
yield return buffer;
}
}
}

View File

@ -7,7 +7,7 @@ using System.Text.Json.Serialization;
using System.Text.Json;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Serializer
namespace JiShe.CollectBus.Common.Helpers
{
/// <summary>
/// json帮助类

View File

@ -1,177 +0,0 @@
using JiShe.CollectBus.FreeRedisProvider;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Common.Helpers
{
/// <summary>
/// 设备组负载控制
/// </summary>
public class DeviceGroupBalanceControl
{
/// <summary>
/// 分组集合
/// </summary>
private static List<string>[] _cachedGroups;
/// <summary>
/// 设备分组关系映射
/// </summary>
private static Dictionary<string, int> _balancedMapping;
/// <summary>
/// 初始化缓存并强制均衡
/// </summary>
public static void InitializeCache(List<string> deviceList,int groupCount = 50)
{
// 步骤1: 生成均衡映射表
_balancedMapping = CreateBalancedMapping(deviceList, groupCount);
// 步骤2: 根据映射表填充分组
_cachedGroups = new List<string>[groupCount];
for (int i = 0; i < groupCount; i++)
{
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / groupCount + 1);
}
foreach (var deviceId in deviceList)
{
int groupId = _balancedMapping[deviceId];
_cachedGroups[groupId].Add(deviceId);
}
}
/// <summary>
/// 通过 deviceId 获取所在的分组集合
/// </summary>
public static List<string> GetGroup(string deviceId)
{
if (_balancedMapping == null || _cachedGroups == null)
throw new InvalidOperationException("缓存未初始化");
int groupId = _balancedMapping[deviceId];
return _cachedGroups[groupId];
}
/// <summary>
/// 通过 deviceId 获取分组Id
/// </summary>
public static int GetDeviceGroupId(string deviceId)
{
if (_balancedMapping == null || _cachedGroups == null)
throw new InvalidOperationException("缓存未初始化");
return _balancedMapping[deviceId];
}
/// <summary>
/// 创建均衡映射表
/// </summary>
/// <param name="deviceList">数据集合</param>
/// <param name="groupCount">分组数量</param>
/// <param name="maxDeviation">允许的最大偏差百分比</param>
/// <returns></returns>
public static Dictionary<string, int> CreateBalancedMapping(List<string> deviceList, int groupCount, int maxDeviation = 5)
{
var mapping = new Dictionary<string, int>();
int targetPerGroup = deviceList.Count / groupCount;
int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0));
// 初始化分组计数器
int[] groupCounters = new int[groupCount];
foreach (var deviceId in deviceList)
{
int preferredGroup = GetGroupId(deviceId, groupCount);
// 如果首选分组未满,直接分配
if (groupCounters[preferredGroup] < maxAllowed)
{
mapping[deviceId] = preferredGroup;
groupCounters[preferredGroup]++;
}
else
{
// 寻找当前最空闲的分组
int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min());
mapping[deviceId] = fallbackGroup;
groupCounters[fallbackGroup]++;
}
}
return mapping;
}
/// <summary>
/// 分析分组分布
/// </summary>
/// <param name="deviceList"></param>
/// <param name="groupCount"></param>
/// <returns></returns>
public static Dictionary<int, int> AnalyzeDistribution(List<string> deviceList, int groupCount)
{
Dictionary<int, int> distribution = new Dictionary<int, int>();
foreach (var deviceId in deviceList)
{
int groupId = GetGroupId(deviceId, groupCount);
distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1;
}
return distribution;
}
/// <summary>
/// 获取设备ID对应的分组ID
/// </summary>
/// <param name="deviceId"></param>
/// <param name="groupCount"></param>
/// <returns></returns>
public static int GetGroupId(string deviceId, int groupCount)
{
int hash = Fnv1aHash(deviceId);
// 双重取模确保分布均匀
return (hash % groupCount + groupCount) % groupCount;
}
/// <summary>
/// FNV-1a哈希算法
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static int Fnv1aHash(string input)
{
const uint fnvPrime = 16777619;
const uint fnvOffsetBasis = 2166136261;
uint hash = fnvOffsetBasis;
foreach (char c in input)
{
hash ^= (byte)c;
hash *= fnvPrime;
}
return (int)hash;
}
/// <summary>
/// 打印分组统计数据
/// </summary>
public static void PrintDistributionStats()
{
var stats = _cachedGroups
.Select((group, idx) => new { GroupId = idx, Count = group.Count })
.OrderBy(x => x.GroupId);
Console.WriteLine("分组数据量统计:");
foreach (var stat in stats)
{
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
}
}
}
}

View File

@ -28,8 +28,4 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Models
{
public class GlobalPagedResult<T>
{
/// <summary>
/// 数据集合
/// </summary>
public List<T> Items { get; set; }
/// <summary>
/// 是否有下一页
/// </summary>
public bool HasNext { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public long? NextScore { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public string NextMember { get; set; }
}
}

View File

@ -4,18 +4,28 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
namespace JiShe.CollectBus.Common.Models
{
/// <summary>
/// 查询结果
/// </summary>
/// <typeparam name="T"></typeparam>
public class PagedResult<T>
public class BusPagedResult<T>
{
/// <summary>
/// 总条数
/// </summary>
public int TotalCount { get; set; }
public long TotalCount { get; set; }
/// <summary>
/// 当前页码
/// </summary>
public int PageIndex { get; set; }
/// <summary>
/// 每页条数
/// </summary>
public int PageSize { get; set; }
/// <summary>
/// 数据集合

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Models
{
/// <summary>
/// 设备缓存基础模型
/// </summary>
public class DeviceCacheBasicModel
{
/// <summary>
/// 集中器Id
/// </summary>
public int FocusId { get; set; }
/// <summary>
/// 表Id
/// </summary>
public int MeterId { get; set; }
}
}

View File

@ -1,17 +1,10 @@
using FreeRedis;
using JetBrains.Annotations;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.FreeRedisProvider.Options;
using JiShe.CollectBus.Serializer;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.FreeRedisProvider
@ -31,7 +24,7 @@ namespace JiShe.CollectBus.FreeRedisProvider
GetInstance();
}
public RedisClient Instance { get; set; } = new (string.Empty);
public RedisClient Instance { get; set; } = new(string.Empty);
/// <summary>
/// 获取 FreeRedis 客户端
@ -47,5 +40,443 @@ namespace JiShe.CollectBus.FreeRedisProvider
Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance;
}
//public async Task AddMeterZSetCacheData<T>(string redisCacheKey, string redisCacheIndexKey, decimal score, T data)
//{
// if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey))
// {
// throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101");
// }
// // 生成唯一member标识
// var member = data.Serialize();
// // 计算score范围
// decimal dataScore = (long)score << 32;
// //// 事务操作
// //using (var tran = FreeRedisProvider.Instance.Multi())
// //{
// // await tran.ZAddAsync(cacheKey, score,member);
// // await tran.SAddAsync($"cat_index:{categoryId}", member);
// // object[] ret = tran.Exec();
// //}
// using (var pipe = Instance.StartPipe())
// {
// pipe.ZAdd(redisCacheKey, dataScore, member);
// pipe.SAdd(redisCacheIndexKey, member);
// object[] ret = pipe.EndPipe();
// }
// await Task.CompletedTask;
//}
//public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>(
//string redisCacheKey,
//string redisCacheIndexKey,
//decimal score,
//int pageSize = 10,
//int pageIndex = 1)
//{
// if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey))
// {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101");
// }
// // 计算score范围
// decimal minScore = (long)score << 32;
// decimal maxScore = ((long)score + 1) << 32;
// // 分页参数
// int start = (pageIndex - 1) * pageSize;
// // 查询主数据
// var members = await Instance.ZRevRangeByScoreAsync(
// redisCacheKey,
// maxScore,
// minScore,
// offset: start,
// count: pageSize
// );
// if (members == null)
// {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102");
// }
// // 查询总数
// var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore);
// return new BusPagedResult<T>
// {
// Items = members.Select(m =>
// BusJsonSerializer.Deserialize<T>(m)!).ToList(),
// TotalCount = total,
// PageIndex = pageIndex,
// PageSize = pageSize
// };
//}
///// <summary>
///// 删除数据示例
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">分类</param>
///// <param name="redisCacheIndexKey"></param>
///// <param name="data"></param>
///// <returns></returns>
//public async Task RemoveMeterZSetData<T>(
//string redisCacheKey,
//string redisCacheIndexKey,
//T data)
//{
// // 查询需要删除的member
// var members = await Instance.SMembersAsync(redisCacheIndexKey);
// var target = members.FirstOrDefault(m =>
// BusJsonSerializer.Deserialize<T>(m) == data);//泛型此处该如何处理?
// if (target != null)
// {
// using (var trans = Instance.Multi())
// {
// trans.ZRem(redisCacheKey, target);
// trans.SRem(redisCacheIndexKey, target);
// trans.Exec();
// }
// }
// await Task.CompletedTask;
//}
public async Task AddMeterZSetCacheData<T>(
string redisCacheKey,
string redisCacheIndexKey,
int categoryId, // 新增分类ID参数
T data,
DateTimeOffset? timestamp = null)
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
|| string.IsNullOrWhiteSpace(redisCacheIndexKey))
{
throw new ArgumentException("Invalid parameters");
}
// 生成唯一member标识带数据指纹
var member = $"{categoryId}:{Guid.NewGuid()}";
var serializedData = data.Serialize();
// 计算组合score分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks;
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// 主数据存储Hash
trans.HSet(redisCacheKey, member, serializedData);
// 排序索引使用ZSET
trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member);
// 分类索引
trans.SAdd(redisCacheIndexKey, member);
//全局索引
trans.ZAdd("global_data_all", globalScore, member);
var results = trans.Exec();
if (results == null || results.Length <= 0)
throw new Exception("Transaction failed");
}
await Task.CompletedTask;
}
public async Task BatchAddMeterData<T>(
string redisCacheKey,
string indexKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel
{
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
//foreach (var batch in items.Batch(BATCH_SIZE))
//{
// await semaphore.WaitAsync();
// _ = Task.Run(async () =>
// {
// using (var pipe = FreeRedisProvider.Instance.StartPipe())
// {
// foreach (var item in batch)
// {
// var member = $"{item.CategoryId}:{Guid.NewGuid()}";
// long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks;
// // Hash主数据
// pipe.HSet(redisCacheKey, member, item.Data.Serialize());
// // 分类索引
// pipe.ZAdd($"{redisCacheKey}_scores", score, member);
// // 全局索引
// pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member);
// // 分类快速索引
// pipe.SAdd(indexKey, member);
// }
// pipe.EndPipe();
// }
// semaphore.Release();
// });
//}
await Task.CompletedTask;
}
public async Task UpdateMeterData<T>(
string redisCacheKey,
string oldCategoryIndexKey,
string newCategoryIndexKey,
string memberId, // 唯一标识(格式:"分类ID:GUID"
T newData,
int? newCategoryId = null,
DateTimeOffset? newTimestamp = null)
{
// 参数校验
if (string.IsNullOrWhiteSpace(memberId))
throw new ArgumentException("Invalid member ID");
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local oldIndex = KEYS[3]
local newIndex = KEYS[4]
local member = ARGV[1]
local newData = ARGV[2]
local newScore = ARGV[3]
--
if redis.call('HEXISTS', mainKey, member) == 0 then
return 0
end
--
redis.call('HSET', mainKey, member, newData)
--
if newScore ~= '' then
--
redis.call('SREM', oldIndex, member)
--
redis.call('ZADD', scoreKey, newScore, member)
--
redis.call('SADD', newIndex, member)
end
return 1
";
// 计算新score当分类或时间变化时
long? newScoreValue = null;
if (newCategoryId.HasValue || newTimestamp.HasValue)
{
var parts = memberId.Split(':');
var oldCategoryId = int.Parse(parts[0]);
var actualCategoryId = newCategoryId ?? oldCategoryId;
var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks;
}
var result = await Instance.EvalAsync(luaScript,
new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
oldCategoryIndexKey,
newCategoryIndexKey
},
new[]
{
memberId,
newData.Serialize(),
newScoreValue?.ToString() ?? ""
});
// 如果时间戳变化则更新全局索引
if (newTimestamp.HasValue)
{
long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds();
await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId);
}
if ((int)result == 0)
throw new KeyNotFoundException("指定数据不存在");
}
public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>(
string redisCacheKey,
string redisCacheIndexKey,
int categoryId,
int pageSize = 10,
int pageIndex = 1,
bool descending = true)
{
// 计算score范围
long minScore = (long)categoryId << 32;
long maxScore = ((long)categoryId + 1) << 32;
// 分页参数计算
int start = (pageIndex - 1) * pageSize;
// 获取排序后的member列表
var members = descending
? await Instance.ZRevRangeByScoreAsync(
$"{redisCacheKey}_scores",
maxScore,
minScore,
start,
pageSize)
: await Instance.ZRangeByScoreAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore,
start,
pageSize);
// 批量获取实际数据
var dataTasks = members.Select(m =>
Instance.HGetAsync<T>(redisCacheKey, m)).ToArray();
await Task.WhenAll(dataTasks);
// 总数统计优化
var total = await Instance.ZCountAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore);
return new BusPagedResult<T>
{
Items = dataTasks.Select(t => t.Result).ToList(),
TotalCount = total,
PageIndex = pageIndex,
PageSize = pageSize
};
}
public async Task RemoveMeterZSetData<T>(
string redisCacheKey,
string redisCacheIndexKey,
string uniqueId) // 改为基于唯一标识删除
{
// 原子操作
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local indexKey = KEYS[3]
local member = ARGV[1]
redis.call('HDEL', mainKey, member)
redis.call('ZREM', scoreKey, member)
redis.call('SREM', indexKey, member)
return 1
";
var keys = new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
redisCacheIndexKey
};
var result = await Instance.EvalAsync(luaScript,
keys,
new[] { uniqueId });
if ((int)result != 1)
throw new Exception("删除操作失败");
}
public async Task<GlobalPagedResult<T>> GetGlobalPagedData<T>(
string redisCacheKey,
int pageSize = 10,
long? lastScore = null,
string lastMember = null,
bool descending = true)
{
const string zsetKey = "global_data_all";
// 分页参数处理
var (startScore, excludeMember) = descending
? (lastScore ?? long.MaxValue, lastMember)
: (lastScore ?? 0, lastMember);
// 获取成员列表
string[] members;
if (descending)
{
members = await Instance.ZRevRangeByScoreAsync(
zsetKey,
max: startScore,
min: 0,
offset: 0,
count: pageSize + 1);
}
else
{
members = await Instance.ZRangeByScoreAsync(
zsetKey,
min: startScore,
max: long.MaxValue,
offset: 0,
count: pageSize + 1);
}
// 处理分页结果
bool hasNext = members.Length > pageSize;
var actualMembers = members.Take(pageSize).ToArray();
// 批量获取数据(优化版本)
var dataTasks = actualMembers
.Select(m => Instance.HGetAsync<T>(redisCacheKey, m))
.ToArray();
await Task.WhenAll(dataTasks);
// 获取下一页游标
(long? nextScore, string nextMember) = actualMembers.Any()
? await GetNextCursor(zsetKey, actualMembers.Last(), descending)
: (null, null);
return new GlobalPagedResult<T>
{
Items = dataTasks.Select(t => t.Result).ToList(),
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember
};
}
private async Task<(long? score, string member)> GetNextCursor(
string zsetKey,
string lastMember,
bool descending)
{
var score = await Instance.ZScoreAsync(zsetKey, lastMember);
return (score.HasValue ? (long)score.Value : null, lastMember);
}
}
}

View File

@ -9,4 +9,7 @@
<PackageReference Include="FreeRedis" Version="1.3.6" />
<PackageReference Include="Volo.Abp" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -163,6 +163,7 @@
overflow-y: hidden;
color: #555;
} */
.caption {
padding: 9px;
overflow-y: hidden;

View File

@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Models;
namespace JiShe.CollectBus.IoTDBProvider
{
@ -48,6 +44,6 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new();
Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new();
}
}

View File

@ -1,22 +1,14 @@
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.IoTDBProvider
{
@ -118,12 +110,12 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
public async Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
public async Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
{
var query = BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
var result = new PagedResult<T>
var result = new BusPagedResult<T>
{
TotalCount = await GetTotalCount<T>(options),
Items = ParseResults<T>(sessionDataSet, options.PageSize)

View File

@ -42,7 +42,8 @@ namespace JiShe.CollectBus.Kafka.AdminClient
/// <returns></returns>
public IAdminClient GetInstance(IConfiguration configuration)
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
var adminClientConfig = new AdminClientConfig()
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
@ -69,6 +70,30 @@ namespace JiShe.CollectBus.Kafka.AdminClient
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
}
/// <summary>
/// 判断Kafka主题是否存在
/// </summary>
/// <param name="topic">主题名称</param>
/// <param name="numPartitions">副本数量不能高于Brokers数量</param>
/// <returns></returns>
public async Task<bool> CheckTopicAsync(string topic,int numPartitions)
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
if(numPartitions > metadata.Brokers.Count)
{
throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ;
}
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
}
//// <summary>
/// 创建Kafka主题
/// </summary>
/// <param name="topic">主题名称</param>
/// <param name="numPartitions">主题分区数量</param>
/// <param name="replicationFactor">副本数量不能高于Brokers数量</param>
/// <returns></returns>
public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor)
{
@ -96,23 +121,68 @@ namespace JiShe.CollectBus.Kafka.AdminClient
}
}
/// <summary>
/// 删除Kafka主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task DeleteTopicAsync(string topic)
{
await Instance.DeleteTopicsAsync(new[] { topic });
}
/// <summary>
/// 获取Kafka主题列表
/// </summary>
/// <returns></returns>
public async Task<List<string>> ListTopicsAsync()
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return new List<string>(metadata.Topics.Select(t => t.Topic));
}
/// <summary>
/// 判断Kafka主题是否存在
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task<bool> TopicExistsAsync(string topic)
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Any(t => t.Topic == topic);
}
/// <summary>
/// 检测分区是否存在
/// </summary>
/// <param name="topic"></param>
/// <param name="partitions"></param>
/// <returns></returns>
public Dictionary<int, bool> CheckPartitionsExists(string topic, int[] partitions)
{
var result = new Dictionary<int, bool>();
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
if (metadata.Topics.Count == 0)
return partitions.ToDictionary(p => p, p => false);
var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet();
return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p));
}
/// <summary>
/// 检测分区是否存在
/// </summary>
/// <param name="topic"></param>
/// <param name="targetPartition"></param>
/// <returns></returns>
public bool CheckPartitionsExist(string topic, int targetPartition)
{
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
if (metadata.Topics.Count == 0)
return false;
var partitions = metadata.Topics[0].Partitions;
return partitions.Any(p => p.PartitionId == targetPartition);
}
public void Dispose()
{
Instance?.Dispose();

View File

@ -8,9 +8,49 @@ namespace JiShe.CollectBus.Kafka.AdminClient
{
public interface IAdminClientService
{
/// <summary>
/// 创建Kafka主题
/// </summary>
/// <param name="topic">主题名称</param>
/// <param name="numPartitions">主题分区数量</param>
/// <param name="replicationFactor">副本数量不能高于Brokers数量</param>
/// <returns></returns>
Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor);
/// <summary>
/// 删除Kafka主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
Task DeleteTopicAsync(string topic);
/// <summary>
/// 获取Kafka主题列表
/// </summary>
/// <returns></returns>
Task<List<string>> ListTopicsAsync();
/// <summary>
/// 判断Kafka主题是否存在
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
Task<bool> TopicExistsAsync(string topic);
/// <summary>
/// 检测分区是否存在
/// </summary>
/// <param name="topic"></param>
/// <param name="partitions"></param>
/// <returns></returns>
Dictionary<int, bool> CheckPartitionsExists(string topic, int[] partitions);
/// <summary>
/// 检测分区是否存在
/// </summary>
/// <param name="topic"></param>
/// <param name="targetPartition"></param>
/// <returns></returns>
bool CheckPartitionsExist(string topic, int targetPartition);
}
}

View File

@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Attributes
{
[AttributeUsage(AttributeTargets.Method)]
public class KafkaSubscribeAttribute : Attribute
{
/// <summary>
/// 订阅的主题
/// </summary>
public string[] Topics { get; set; }
/// <summary>
/// 分区
/// </summary>
public int Partition { get; set; } = -1;
/// <summary>
/// 消费者组
/// </summary>
public string GroupId { get; set; }
public KafkaSubscribeAttribute(string[] topics, string groupId = "default")
{
this.Topics = topics;
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string topic, string groupId = "default")
{
this.Topics = new string[] { topic };
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string[] topics, int partition, string groupId = "default")
{
this.Topics = topics;
this.Partition = partition;
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default")
{
this.Topics = new string[] { topic };
this.Partition = partition;
this.GroupId = groupId;
}
}
}

View File

@ -1,8 +1,14 @@
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Modularity;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka
{
@ -10,6 +16,17 @@ namespace JiShe.CollectBus.Kafka
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
// 注册Producer
context.Services.AddTransient<IProducerService, ProducerService>();
// 注册Consumer
context.Services.AddTransient<IConsumerService, ConsumerService>();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
}
}
}

View File

@ -6,76 +6,205 @@ using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient;
using static Confluent.Kafka.ConfigPropertyNames;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using NUglify.Html;
namespace JiShe.CollectBus.Kafka.Consumer
{
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable, ISingletonDependency
public class ConsumerService : IConsumerService, IDisposable
{
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
private CancellationTokenSource _cancellationTokenSource;
private readonly ILogger<ConsumerService> _logger;
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
{
_configuration = configuration;
_logger = logger;
GetInstance(configuration);
}
#region private
public IConsumer<TKey, TValue> Instance { get; set; } = default;
/// <summary>
/// 创建消费者
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
private IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(string? groupId = null) where TKey : notnull where TValue : class
{
var config = BuildConsumerConfig(groupId);
return new ConsumerBuilder<TKey, TValue>(config)
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
.Build();
}
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
private ConsumerConfig BuildConsumerConfig(string? groupId = null)
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var consumerConfig = new ConsumerConfig
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AutoOffsetReset = AutoOffsetReset.Earliest
BootstrapServers = _configuration["Kafka:BootstrapServers"],
GroupId = groupId ?? "default",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // 禁止AutoCommit
};
if (enableAuthorization)
if (enableAuth)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ConsumerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
config.SaslMechanism = SaslMechanism.Plain;
config.SaslUsername = _configuration["Kafka:SaslUserName"];
config.SaslPassword = _configuration["Kafka:SaslPassword"];
}
public async Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler)
{
_cancellationTokenSource = new CancellationTokenSource();
Instance.Subscribe(topic);
return config;
}
#endregion
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
await SubscribeAsync<TKey, TValue>(new[] { topic }, messageHandler, groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
{
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof((TKey, TValue));
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
var result = consumer.Consume(cts.Token);
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
{
var result = Instance.Consume(_cancellationTokenSource.Token);
if (result != null)
{
await messageHandler(result.Message.Key, result.Message.Value);
consumer.Commit(result); // 手动提交
}
}
}
catch (OperationCanceledException)
catch (ConsumeException ex)
{
Instance.Close();
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
}
});
await Task.CompletedTask;
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
var consumerKey = typeof((Null, TValue));
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
(
CreateConsumer<Null, TValue>(groupId),
cts
)).Consumer as IConsumer<Null, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
var result = consumer.Consume(cts.Token);
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
}
});
await Task.CompletedTask;
}
/// <summary>
/// 取消消息订阅
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class
{
var consumerKey = typeof((TKey, TValue));
if (_consumerStore.TryRemove(consumerKey, out var entry))
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
}
public void Unsubscribe()
{
_cancellationTokenSource?.Cancel();
Instance?.Unsubscribe();
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Unsubscribe();
Instance?.Dispose();
_cancellationTokenSource?.Dispose();
foreach (var entry in _consumerStore.Values)
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
_consumerStore.Clear();
}
}
}

View File

@ -6,10 +6,32 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer
{
public interface IConsumerService<TKey, TValue>
public interface IConsumerService
{
Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler);
void Unsubscribe();
void Dispose();
Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId=null) where TKey : notnull where TValue : class;
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId) where TKey : notnull where TValue : class;
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public interface IKafkaSubscribe
{
}
}

View File

@ -8,7 +8,12 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="Volo.Abp" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AspNetCore" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,132 @@
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka
{
public static class KafkaSubcribesExtensions
{
/// <summary>
/// 添加Kafka订阅
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return;
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
lifetime.ApplicationStarted.Register(() =>
{
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => {
if(subscribe is IKafkaSubscribe)
{
BuildKafkaSubscriber(subscribe, provider);
}
});
}
});
}
/// <summary>
/// 构建Kafka订阅
/// </summary>
/// <param name="subscribe"></param>
/// <param name="provider"></param>
private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider)
{
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null)
.ToArray();
foreach (var sub in subscribedMethods)
{
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe));
}
}
/// <summary>
/// 启动后台消费线程
/// </summary>
/// <param name="config"></param>
/// <param name="attr"></param>
/// <param name="method"></param>
/// <param name="consumerInstance"></param>
/// <returns></returns>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe)
{
var consumerService = provider.GetRequiredService<IConsumerService>();
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
await consumerService.SubscribeAsync<string>(attr.Topics, async (message) =>
{
try
{
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message"></param>
/// <param name="method"></param>
/// <param name="subscribe"></param>
/// <returns></returns>
private static async Task<bool> ProcessMessageAsync(string message, MethodInfo method, object subscribe)
{
var parameters = method.GetParameters();
if (parameters.Length != 1)
return true;
var paramType = parameters[0].ParameterType;
var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType);
if (method.ReturnType == typeof(Task))
{
object? result = await (Task<bool>)method.Invoke(subscribe, new[] { messageObj })!;
if (result is bool success)
return success;
}
else
{
object? result = method.Invoke(subscribe, new[] { messageObj });
if (result is bool success)
return success;
}
return false;
}
}
}

View File

@ -7,10 +7,14 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Producer
{
public interface IProducerService<TKey, TValue>
public interface IProducerService
{
Task ProduceAsync(string topic, TKey key, TValue value);
Task ProduceAsync(string topic, TValue value);
void Dispose();
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class;
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -11,63 +12,180 @@ using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable,ITransientDependency
public class ProducerService: IProducerService, IDisposable
{
private readonly ILogger<ProducerService> _logger;
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Tuple<Type, Type>, object> _producerCache = new();
private readonly ILogger<ProducerService<TKey, TValue>> _logger;
protected ProducerService(IConfiguration configuration, ILogger<ProducerService<TKey, TValue>> logger)
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger)
{
_configuration = configuration;
_logger = logger;
GetInstance(configuration);
}
public IProducer<TKey, TValue> Instance { get; set; } = default;
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
#region private
/// <summary>
/// 创建生产者实例
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
private IProducer<TKey, TValue> GetProducer<TKey, TValue>()
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var consumerConfig = new ProducerConfig
var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!;
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true
var config = BuildProducerConfig();
return new ProducerBuilder<TKey, TValue>(config)
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
.Build();
});
}
/// <summary>
/// 配置
/// </summary>
/// <returns></returns>
private ProducerConfig BuildProducerConfig()
{
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ProducerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 20, // 修改等待时间为20ms
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
};
if (enableAuthorization)
if (enableAuth)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
config.SaslMechanism = SaslMechanism.Plain;
config.SaslUsername = _configuration["Kafka:SaslUserName"];
config.SaslPassword = _configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
return config;
}
public async Task ProduceAsync(string topic, TKey key, TValue value)
private static LogLevel ConvertLogLevel(SyslogLevel level) => level switch
{
SyslogLevel.Emergency => LogLevel.Critical,
SyslogLevel.Alert => LogLevel.Critical,
SyslogLevel.Critical => LogLevel.Critical,
SyslogLevel.Error => LogLevel.Error,
SyslogLevel.Warning => LogLevel.Warning,
SyslogLevel.Notice => LogLevel.Information,
SyslogLevel.Info => LogLevel.Information,
SyslogLevel.Debug => LogLevel.Debug,
_ => LogLevel.None
};
#endregion
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
{
var producer = GetProducer<TKey, TValue>();
await producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });
}
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{
var producer = GetProducer<Null, TValue>();
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value });
}
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic,TKey key,TValue value,int? partition=null, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)where TKey : notnull where TValue : class
{
var message = new Message<TKey, TValue>
{
Key = key,
Value = value
};
var producer = GetProducer<TKey, TValue>();
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
await Instance.ProduceAsync(topic, message);
}
public async Task ProduceAsync(string topic, TValue value)
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
{
var message = new Message<TKey, TValue>
var message = new Message<Null, TValue>
{
Value = value
};
await Instance.ProduceAsync(topic, message);
var producer = GetProducer<Null, TValue>();
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
}
public void Dispose()
{
Instance?.Dispose();
foreach (var producer in _producerCache.Values.OfType<IDisposable>())
{
producer.Dispose();
}
_producerCache.Clear();
}
}
}

View File

@ -49,7 +49,6 @@ namespace JiShe.CollectBus.Protocol.Contracts
//动态上报主题需根据协议的AFN功能码动态获取
var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
//需要排除的AFN功能码
var excludeItems = new List<int>() { 6, 7, 8,15 };

View File

@ -70,22 +70,9 @@ namespace JiShe.CollectBus.Protocol.Contracts
#region
/// <summary>
/// 1分钟采集水表数据下行消息主题
/// 水表数据下行消息主题,由于水表采集频率不高,所以一个主题就够
/// </summary>
public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.auto.one.watermeter.event";
/// <summary>
/// 5分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.auto.five.watermeter.event";
/// <summary>
/// 15分钟采集水表数据下行消息主题
/// </summary>
public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.auto.fifteen.watermeter.event";
/// <summary>
/// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号等
/// </summary>
public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.auto.other.watermeter.event";
public const string WatermeterSubscriberWorkerAutoReadingIssuedEventName = "issued.auto.reading.watermeter.event";
/// <summary>
/// 水表自动阀控