diff --git a/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs index 15d60d9..0f04005 100644 --- a/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs @@ -77,22 +77,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading Task InitWatermeterCacheData(string gatherCode = ""); /// - /// 1分钟采集水表数据 + /// 水表数据采集 /// /// - Task WatermeterScheduledMeterOneMinuteReading(); - - /// - /// 5分钟采集水表数据,只获取任务数据下发,不构建任务 - /// - /// - Task WatermeterScheduledMeterFiveMinuteReading(); - - /// - /// 15分钟采集水表数据,只获取任务数据下发,不构建任务 - /// - /// - Task WatermeterScheduledMeterFifteenMinuteReading(); + Task WatermeterScheduledMeterAutoReading(); + #endregion diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index 64dddf1..abba774 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -39,19 +39,8 @@ namespace JiShe.CollectBus.Subscribers /// 1分钟采集水表数据下行消息消费订阅 /// /// - Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); - - /// - /// 5分钟采集水表数据下行消息消费订阅 - /// - /// - Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); - - /// - /// 15分钟采集水表数据下行消息消费订阅 - /// - /// - Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + #endregion } } diff --git a/src/JiShe.CollectBus.Application/CollectBusAppService.cs b/src/JiShe.CollectBus.Application/CollectBusAppService.cs index c634859..e1f913e 100644 --- a/src/JiShe.CollectBus.Application/CollectBusAppService.cs +++ b/src/JiShe.CollectBus.Application/CollectBusAppService.cs @@ -1,10 +1,20 @@ -using FreeRedis; +using Confluent.Kafka; +using FreeRedis; using FreeSql; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.FreeSql; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Localization; +using JiShe.CollectBus.Serializer; using Microsoft.AspNetCore.Mvc; +using System; using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; using Volo.Abp.Application.Services; @@ -16,9 +26,205 @@ public abstract class CollectBusAppService : ApplicationService public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService(); protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService()!; + protected CollectBusAppService() { LocalizationResource = typeof(CollectBusResource); ObjectMapperContext = typeof(CollectBusApplicationModule); + } + + /// + /// Lua脚本批量获取缓存的表计信息 + /// + /// 表信息数据对象 + /// 采集频率对应的缓存Key集合 + /// 系统类型 + /// 服务器标识 + /// 采集频率,1分钟、5分钟、15分钟 + /// 表计类型 + /// + protected async Task>> GetMeterRedisCacheDictionaryData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class + { + if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101"); + } + + var meterInfos = new Dictionary>(); + var luaScript = @" + local results = {} + for i, key in ipairs(KEYS) do + local data = redis.call('HGETALL', key) + results[i] = {key, data} + end + return results"; + + // 分页参数:每页处理10000个键 + int pageSize = 10000; + int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); + + for (int page = 0; page < totalPages; page++) + { + // 分页获取当前批次的键 + var batchKeys = redisKeys + .Skip(page * pageSize) + .Take(pageSize) + .ToArray(); + + // 执行Lua脚本获取当前批次数据 + var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); + if (merterResult == null) + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102"); + } + + // 解析当前批次的结果 + if (merterResult is object[] arr) + { + foreach (object[] item in arr) + { + string key = (string)item[0]; + object[] fieldsAndValues = (object[])item[1]; + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; + string focusAddress = key.Replace(redisCacheKey, ""); + + var meterHashs = new Dictionary(); + for (int i = 0; i < fieldsAndValues.Length; i += 2) + { + string meterId = (string)fieldsAndValues[i]; + string meterStr = (string)fieldsAndValues[i + 1]; + + T meterInfo = default!; + if (!string.IsNullOrWhiteSpace(meterStr)) + { + meterInfo = meterStr.Deserialize()!; + } + if (meterInfo != null) + { + meterHashs[meterId] = meterInfo; + } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103"); + } + } + + // 合并到总结果,若存在重复key则覆盖 + if (meterInfos.ContainsKey(focusAddress)) + { + foreach (var kvp in meterHashs) + { + meterInfos[focusAddress][kvp.Key] = kvp.Value; + } + } + else + { + meterInfos[focusAddress] = meterHashs; + } + } + } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104"); + } + } + + return meterInfos; + } + + /// + /// Lua脚本批量获取缓存的表计信息 + /// + /// 表信息数据对象 + /// 采集频率对应的缓存Key集合 + /// 系统类型 + /// 服务器标识 + /// 采集频率,1分钟、5分钟、15分钟 + /// 表计类型 + /// + protected async Task> GetMeterRedisCacheListData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class + { + if (redisKeys == null || redisKeys.Length <= 0 || + string.IsNullOrWhiteSpace(systemType) || + string.IsNullOrWhiteSpace(serverTagName) || + string.IsNullOrWhiteSpace(timeDensity)) + { + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101"); + } + + var meterInfos = new List(); + var luaScript = @" + local results = {} + for i, key in ipairs(KEYS) do + local data = redis.call('HGETALL', key) + results[i] = {key, data} + end + return results"; + + // 分页参数:每页10000个键 + int pageSize = 10000; + int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); + + for (int page = 0; page < totalPages; page++) + { + // 分页获取当前批次键 + var batchKeys = redisKeys + .Skip(page * pageSize) + .Take(pageSize) + .ToArray(); + + // 执行Lua脚本获取当前页数据 + var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); + if (merterResult == null) + { + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102"); + } + + // 解析当前页结果 + if (merterResult is object[] arr) + { + foreach (object[] item in arr) + { + string key = (string)item[0]; + object[] fieldsAndValues = (object[])item[1]; + var redisCacheKey = string.Format( + RedisConst.CacheMeterInfoKey, + systemType, + serverTagName, + meterType, + timeDensity + ); + string focusAddress = key.Replace(redisCacheKey, ""); + + for (int i = 0; i < fieldsAndValues.Length; i += 2) + { + string meterId = (string)fieldsAndValues[i]; + string meterStr = (string)fieldsAndValues[i + 1]; + + T meterInfo = default!; + if (!string.IsNullOrWhiteSpace(meterStr)) + { + meterInfo = meterStr.Deserialize()!; + } + if (meterInfo != null) + { + meterInfos.Add(meterInfo); + } + else + { + throw new Exception( + $"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}),-103" + ); + } + } + } + } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104"); + } + } + + return meterInfos; } -} +} diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 1a8b326..008e736 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -59,7 +59,7 @@ public class CollectBusApplicationModule : AbpModule //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); - dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + //dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index c348a3d..ff4de0c 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; @@ -141,7 +142,15 @@ namespace JiShe.CollectBus.Plugins await client.ResetIdAsync(deviceNo); - var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo); + var deviceInfoList= await _deviceRepository.GetListAsync(a => a.Number == deviceNo); + if (deviceInfoList != null && deviceInfoList.Count > 1) + { + //todo 推送集中器编号重复预警 + _logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复"); + return; + } + + var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo); if (entity == null) { await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online)); @@ -171,7 +180,15 @@ namespace JiShe.CollectBus.Plugins string clientId = deviceNo; string oldClientId = $"{client.Id}"; - var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo); + var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == deviceNo); + if (deviceInfoList != null && deviceInfoList.Count > 1) + { + //todo 推送集中器编号重复预警 + _logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复"); + return; + } + + var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo); if (entity == null) //没有登录帧的设备,只有心跳帧 { await client.ResetIdAsync(clientId); diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index e8b483d..e4c078d 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -13,10 +13,13 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using JiShe.CollectBus.IoTDBProvider.Context; using Microsoft.Extensions.Logging; -using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IotSystems.AFNEntity; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; +using System.Diagnostics.Metrics; +using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Kafka.Attributes; using System.Text.Json; using JiShe.CollectBus.Kafka; @@ -111,21 +114,49 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS [HttpGet] public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000) { - var deviceList = new List(); - for (int i = 0; i < deviceCount; i++) + //var deviceList = new List(); + //for (int i = 0; i < deviceCount; i++) + //{ + // deviceList.Add($"Device_{Guid.NewGuid()}"); + //} + + //// 初始化缓存 + //DeviceGroupBalanceControl.InitializeCache(deviceList); + + var timeDensity = "15"; + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; + + var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter); + List focusAddressDataLista = new List(); + foreach (var item in meterInfos) { - deviceList.Add($"Device_{Guid.NewGuid()}"); + focusAddressDataLista.Add(item.FocusAddress); } - // 初始化缓存 - DeviceGroupBalanceControl.InitializeCache(deviceList); - + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + // 打印分布统计 DeviceGroupBalanceControl.PrintDistributionStats(); await Task.CompletedTask; } + /// + /// 测试设备分组均衡控制算法获取分组Id + /// + /// + /// + [HttpGet] + public async Task TestGetDeviceGroupBalanceControl(string deviceAddress) + { + var groupId = DeviceGroupBalanceControl.GetDeviceGroupId(deviceAddress); + Console.WriteLine(groupId); + + await Task.CompletedTask; + } + /// /// 测试单个测点数据项 diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 8b19e49..2e5e1fb 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,15 +1,20 @@ -using DotNetCore.CAP; +using Confluent.Kafka; +using DeviceDetectorNET.Class.Client; +using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.EnergySystems.Entities; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Serializer; @@ -18,6 +23,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace JiShe.CollectBus.ScheduledMeterReading @@ -31,17 +37,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly ICapPublisher _producerBus; private readonly IIoTDBProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; + private readonly IProducerService _producerService; + public BasicScheduledMeterReadingService( ILogger logger, ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordRepository, + IProducerService producerService, IIoTDBProvider dbProvider) { _producerBus = producerBus; _logger = logger; _dbProvider = dbProvider; _meterReadingRecordRepository = meterReadingRecordRepository; + _producerService = producerService; } /// @@ -96,20 +106,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - //检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过 - if (!IsGennerateCmd(tasksToBeIssueModel.NextTaskTime)) + //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 + var tempArryay = item.Split(":"); + string meteryType = tempArryay[4];//表计类别 + int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率 + if (timeDensity > 15) { - _logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); + timeDensity = 15; + } + + //检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过 + if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity)) + { + _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); continue; } - //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 - var tempArryay = item.Split(":"); - string meteryType = tempArryay[3];//表计类别 - int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率 - //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*"; + + //获取缓存中的表信息 + var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*"; var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { @@ -117,16 +133,45 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } + var meterTypes = EnumExtensions.ToEnumDictionary(); + if (meteryType == MeterTypeEnum.Ammeter.ToString()) { // 解析结果(结果为嵌套数组) - var meterInfos = await GetMeterRedisCacheData(oneMinutekeyList, $"{timeDensity}", meteryType); + var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } - await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); + //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); + + var timer = Stopwatch.StartNew(); + + //处理数据 + //await DeviceGroupBalanceControl.ProcessGenericListAsync( + // items: meterInfos, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, threadId) => + // { + // _ = AmmerterCreatePublishTask(timeDensity, data); + // } + //); + + + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: data => + { + _ = AmmerterCreatePublishTask(timeDensity, data); + } + ); + + timer.Stop(); + _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); + } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -140,6 +185,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成"); + + //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); @@ -165,7 +212,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { +#if DEBUG + var timeDensity = "15"; + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*"; + + var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); + List focusAddressDataLista = new List(); + foreach (var item in meterInfos) + { + focusAddressDataLista.Add(item.FocusAddress); + } + + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + return; +#else var meterInfos = await GetAmmeterInfoList(gatherCode); +#endif + + if (meterInfos == null || meterInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); @@ -299,7 +365,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); @@ -363,7 +429,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); @@ -427,7 +493,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); @@ -460,12 +526,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); + stopwatch.Stop(); @@ -532,7 +593,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据分组创建线程批处理集中器 foreach (var group in focusHashGroups) { - await AmmerterCreatePublishTask(timeDensity, group.Value); + await AmmerterCreatePublishTask2(timeDensity, group.Value); } } catch (Exception) @@ -542,13 +603,274 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } + + /// + /// 电表创建发布任务 + /// + /// 采集频率 + /// 集中器号hash分组的集中器集合数据 + /// + private async Task AmmerterCreatePublishTask(int timeDensity + , AmmeterInfo ammeterInfo) + { + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? + + var currentTime = DateTime.Now; + var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 + var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; + + if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) + { + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); + return; + } + + //载波的不处理 + if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) + { + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); + return; + } + + if (ammeterInfo.State.Equals(2)) + { + //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); + return; + } + + ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 + //if (!IsGennerateCmd(ammeter.LastTime, -1)) + //{ + // _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令"); + // continue; + //} + + if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) + { + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); + return; + } + if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) + { + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); + return; + } + if (Convert.ToInt32(ammeterInfo.Address) > 65535) + { + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); + return; + } + if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) + { + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); + return; + } + + List tempCodes = ammeterInfo.ItemCodes.Deserialize>()!; + + //TODO:自动上报数据只主动采集1类数据。 + if (ammeterInfo.AutomaticReport.Equals(1)) + { + var tempSubCodes = new List(); + if (tempCodes.Contains("0C_49")) + { + tempSubCodes.Add("0C_49"); + } + + if (tempSubCodes.Contains("0C_149")) + { + tempSubCodes.Add("0C_149"); + } + + if (ammeterInfo.ItemCodes.Contains("10_97")) + { + tempSubCodes.Add("10_97"); + } + + if (tempSubCodes == null || tempSubCodes.Count <= 0) + { + //_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); + return; + } + else + { + tempCodes = tempSubCodes; + } + } + + Dictionary keyValuePairs = new Dictionary(); + + foreach (var tempItem in tempCodes) + { + //排除已发送日冻结和月冻结采集项配置 + if (DayFreezeCodes.Contains(tempItem)) + { + continue; + } + + if (MonthFreezeCodes.Contains(tempItem)) + { + continue; + } + + var itemCodeArr = tempItem.Split('_'); + var aFNStr = itemCodeArr[0]; + var aFN = (AFN)aFNStr.HexToDec(); + var fn = int.Parse(itemCodeArr[1]); + byte[] dataInfos = null; + if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) + { + //实时数据 + dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn); + } + else + { + string methonCode = $"AFN{aFNStr}_Fn_Send"; + //特殊表暂不处理 + if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode + , out var handler)) + { + dataInfos = handler(new TelemetryPacketRequest() + { + FocusAddress = ammeterInfo.FocusAddress, + Fn = fn, + Pn = ammeterInfo.MeteringCode + }); + } + else + { + //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); + continue; + } + } + //TODO:特殊表 + + if (dataInfos == null || dataInfos.Length <= 0) + { + //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); + continue; + } + + + + var meterReadingRecords = new MeterReadingRecords() + { + ProjectID = ammeterInfo.ProjectID, + DatabaseBusiID = ammeterInfo.DatabaseBusiID, + PendingCopyReadTime = pendingCopyReadTime, + CreationTime = currentTime, + MeterAddress = ammeterInfo.AmmerterAddress, + MeterId = ammeterInfo.ID, + MeterType = MeterTypeEnum.Ammeter, + FocusAddress = ammeterInfo.FocusAddress, + FocusID = ammeterInfo.FocusID, + AFN = aFN, + Fn = fn, + ItemCode = tempItem, + TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode), + ManualOrNot = false, + Pn = ammeterInfo.MeteringCode, + IssuedMessageId = GuidGenerator.Create().ToString(), + IssuedMessageHexString = Convert.ToHexString(dataInfos), + }; + meterReadingRecords.CreateDataId(GuidGenerator.Create()); + + keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); + } + //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); + //await Task.Delay(timeSpan); + + //return keyValuePairs; + // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + + using (var pipe = FreeRedisProvider.Instance.StartPipe()) + { + pipe.HSet(redisCacheKey, keyValuePairs); + object[] ret = pipe.EndPipe(); + } + + + await Task.CompletedTask; + } + + /// + /// Kafka 推送消息 + /// + /// 主题名称 + /// 任务记录 + /// + private async Task KafkaProducerIssuedMessage(string topicName, + MeterReadingRecords taskRecord) + { + if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) + { + throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); + } + int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); + TopicPartition topicPartition = new TopicPartition(topicName, partition); + await _producerService.ProduceAsync(topicPartition, null, taskRecord); + } + + private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) + { + var currentDateTime = DateTime.Now; + + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType); + + //FreeRedisProvider.Instance.key() + + var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); + return; + } + + //获取下发任务缓存数据 + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType); + if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); + return; + } + + List meterTaskInfosList = new List(); + + //将取出的缓存任务数据发送到Kafka消息队列中 + foreach (var focusItem in meterTaskInfos) + { + foreach (var ammerterItem in focusItem.Value) + { + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + + _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + + //_ = _producerBus.Publish(tempMsg); + + meterTaskInfosList.Add(ammerterItem.Value); + } + } + if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) + { + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); + } + } + /// /// 电表创建发布任务 /// /// 采集频率 /// 集中器号hash分组的集中器集合数据 /// - private async Task AmmerterCreatePublishTask(int timeDensity + private async Task AmmerterCreatePublishTask2(int timeDensity , Dictionary> focusGroup) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; @@ -713,7 +1035,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading AFN = aFN, Fn = fn, ItemCode = tempItem, - TaskMark = CommonHelper.GetTaskMark((int)aFN, fn,ammeter.MeteringCode), + TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode), ManualOrNot = false, Pn = ammeter.MeteringCode, IssuedMessageId = GuidGenerator.Create().ToString(), @@ -799,10 +1121,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading } /// - /// 1分钟采集水表数据 + /// 水表数据采集 /// /// - public virtual async Task WatermeterScheduledMeterOneMinuteReading() + public virtual async Task WatermeterScheduledMeterAutoReading() { //获取缓存中的水表信息 int timeDensity = 1; @@ -810,15 +1132,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { - _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); + _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); return; } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { - _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); + _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } @@ -836,7 +1158,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); @@ -856,211 +1178,44 @@ namespace JiShe.CollectBus.ScheduledMeterReading //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); - _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); + _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } - /// - /// 5分钟采集水表数据 - /// - /// - public virtual async Task WatermeterScheduledMeterFiveMinuteReading() - { - - //获取缓存中的电表信息 - int timeDensity = 5; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); - var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); - return; - } - - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } - - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); - - - _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); - } - - /// - /// 15分钟采集水表数据 - /// - /// - public virtual async Task WatermeterScheduledMeterFifteenMinuteReading() - { - //获取缓存中的电表信息 - int timeDensity = 15; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); - var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); - return; - } - - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString()); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } - - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); - - - _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); - } #endregion #region 公共处理方法 - /// - /// Lua脚本批量获取缓存的表计信息 - /// - /// 表信息数据对象 - /// 采集频率对应的缓存Key集合 - /// 采集频率,1分钟、5分钟、15分钟 - /// 表计类型 - /// - private async Task>> GetMeterRedisCacheData(string[] redisKeys, string timeDensity, string meterType) where T : class - { - //通过lua脚本一次性获取所有缓存内容 - var luaScript = @" - local results = {} - for i, key in ipairs(KEYS) do - local data = redis.call('HGETALL', key) - results[i] = {key, data} - end - return results"; - var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS - if (merterResult == null) - { - _logger.LogError($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时没有获取到缓存信息,-102"); - return null; - } - // 解析结果(结果为嵌套数组) - var meterInfos = new Dictionary>(); ; - if (merterResult is object[] arr) - { - foreach (object[] item in arr) - { - string key = (string)item[0];//集中器地址对应的Redis缓存Key - object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meterType, timeDensity)}"; - string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 - - var meterHashs = new Dictionary(); - for (int i = 0; i < fieldsAndValues.Length; i += 2) - { - string meterld = (string)fieldsAndValues[i];//表ID - string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 - - T meterInfo = default!; - if (!string.IsNullOrWhiteSpace(meterStr)) - { - meterInfo = meterStr.Deserialize()!; - } - if (meterInfo != null) - { - meterHashs[meterld] = meterInfo; - } - else - { - _logger.LogInformation($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时集中器缓存{key}数据的{meterld}处理异常"); - } - } - meterInfos[focusAddress] = meterHashs; - } - } - - return meterInfos; - } /// - /// 指定时间对比当前时间 + /// 判断是否需要生成采集指令 /// - /// - /// + /// + /// /// - private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) + private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0) { - if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 - return false; - return true; + if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime) + { + return true; + } + + return false; } + ///// + ///// 指定时间对比当前时间 + ///// + ///// + ///// + ///// + //private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) + //{ + // if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 + // return false; + // return true; + //} + ///// ///// 缓存下一个时间的任务 ///// @@ -1091,6 +1246,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*"; } + #endregion } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 12453bc..32a3b9e 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,9 +1,11 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Confluent.Kafka; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; @@ -12,6 +14,7 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; using MassTransit; @@ -33,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration) : base(logger, producerBus, meterReadingRecordRepository, dbProvider) + ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) { serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; } @@ -71,37 +74,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") { - List ammeterInfos = new List(); - ammeterInfos.Add(new AmmeterInfo() - { - Baudrate = 2400, - FocusAddress = "402440506", - Name = "张家祠工务(三相电表)", - FocusID = 95780, - DatabaseBusiID = 1, - MeteringCode = 1, - AmmerterAddress = "402410040506", - ID = 127035, - TypeName = 3, - DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", - TimeDensity = 15, - }); - ammeterInfos.Add(new AmmeterInfo() - { - Baudrate = 2400, - FocusAddress = "542400504", - Name = "五号配(长芦二所四排)(单相电表)", - FocusID = 69280, - DatabaseBusiID = 1, - MeteringCode = 2, - AmmerterAddress = "542410000504", - ID = 95594, - TypeName = 1, - DataTypes = "581,589,592,597,601", - TimeDensity = 15, - }); + //List ammeterInfos = new List(); + //ammeterInfos.Add(new AmmeterInfo() + //{ + // Baudrate = 2400, + // FocusAddress = "402440506", + // Name = "张家祠工务(三相电表)", + // FocusID = 95780, + // DatabaseBusiID = 1, + // MeteringCode = 1, + // AmmerterAddress = "402410040506", + // ID = 127035, + // TypeName = 3, + // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", + // TimeDensity = 15, + //}); + //ammeterInfos.Add(new AmmeterInfo() + //{ + // Baudrate = 2400, + // FocusAddress = "542400504", + // Name = "五号配(长芦二所四排)(单相电表)", + // FocusID = 69280, + // DatabaseBusiID = 1, + // MeteringCode = 2, + // AmmerterAddress = "542410000504", + // ID = 95594, + // TypeName = 1, + // DataTypes = "581,589,592,597,601", + // TimeDensity = 15, + //}); - return ammeterInfos; + //return ammeterInfos; string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID FROM TB_GatherInfo(NOLOCK) AS A @@ -111,10 +114,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading WHERE 1=1 and C.Special = 0 "; //TODO 记得移除特殊表过滤 - if (!string.IsNullOrWhiteSpace(gatherCode)) - { - sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; - } + //if (!string.IsNullOrWhiteSpace(gatherCode)) + //{ + // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; + //} return await SqlProvider.Instance.Change(DbEnum.EnergyDB) .Ado .QueryAsync(sql); @@ -187,6 +190,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading deviceList.Add($"Device_{Guid.NewGuid()}"); } + // 初始化缓存 + DeviceGroupBalanceControl.InitializeCache(deviceList); // 打印分布统计 DeviceGroupBalanceControl.PrintDistributionStats(); diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 4428fe4..3caba20 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -147,69 +147,16 @@ namespace JiShe.CollectBus.Subscribers #endregion #region 水表消息采集 + /// - /// 1分钟采集水表数据下行消息消费订阅 - /// - /// - /// - [HttpPost] - [Route("watermeter/oneminute/issued-event")] - [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)] - public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) - { - _logger.LogInformation("1分钟采集水表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【1分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); - - } - } - } - - /// - /// 5分钟采集水表数据下行消息消费订阅 - /// - /// - /// - [HttpPost] - [Route("watermeter/fiveminute/issued-event")] - [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName)] - public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) - { - _logger.LogInformation("5分钟采集水表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【5分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); - - } - } - } - - /// - /// 15分钟采集水表数据下行消息消费订阅 + /// 水表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("watermeter/fifteenminute/issued-event")] - [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName)] - public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] + public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); diff --git a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index 05fd90d..2857422 100644 --- a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -27,14 +27,14 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(CreateToBeIssueTaskWorker); - CronExpression = $"{10}/* * * * *"; + CronExpression = $"*/{1} * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { - await _scheduledMeterReadingService.CreateToBeIssueTasks(); + // await _scheduledMeterReadingService.CreateToBeIssueTasks(); } } } diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs new file mode 100644 index 0000000..2575675 --- /dev/null +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -0,0 +1,427 @@ +using JiShe.CollectBus.FreeRedisProvider; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; + +namespace JiShe.CollectBus.Common.DeviceBalanceControl +{ + /// + /// 设备组负载控制 + /// + public class DeviceGroupBalanceControl + { + private static readonly object _syncRoot = new object(); + + private static volatile CacheState _currentCache; + + /// + /// 使用ConcurrentDictionary保证线程安全的设备分组映射 + /// + private sealed class CacheState + { + public readonly ConcurrentDictionary BalancedMapping; + public readonly List[] CachedGroups; + + public CacheState(int groupCount) + { + BalancedMapping = new ConcurrentDictionary(); + CachedGroups = new List[groupCount]; + for (int i = 0; i < groupCount; i++) + { + CachedGroups[i] = new List(); + } + } + } + + /// + /// 初始化或增量更新缓存 + /// + public static void InitializeCache(List deviceList, int groupCount = 30) + { + if (deviceList == null || deviceList.Count <= 0) + { + throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化失败,设备数据为空"); + } + + if (groupCount > 60 || groupCount <= 0) + { + groupCount = 60; + } + + lock (_syncRoot) + { + // 首次初始化 + if (_currentCache == null) + { + var newCache = new CacheState(groupCount); + UpdateCacheWithDevices(newCache, deviceList, groupCount); + _currentCache = newCache; + } + // 后续增量更新 + else + { + if (_currentCache.CachedGroups.Length != groupCount) + { + throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化完成以后,分组数量不能更改"); + } + + var clonedCache = CloneExistingCache(); + UpdateCacheWithDevices(clonedCache, deviceList, groupCount); + _currentCache = clonedCache; + } + } + } + + /// + /// 带锁的缓存克隆(写入时复制) + /// + private static CacheState CloneExistingCache() + { + var oldCache = _currentCache; + var newCache = new CacheState(oldCache.CachedGroups.Length); + + // 复制已有映射 + foreach (var kvp in oldCache.BalancedMapping) + { + newCache.BalancedMapping.TryAdd(kvp.Key, kvp.Value); + } + + // 复制分组数据 + for (int i = 0; i < oldCache.CachedGroups.Length; i++) + { + newCache.CachedGroups[i].AddRange(oldCache.CachedGroups[i]); + } + + return newCache; + } + + /// + /// 更新设备到缓存 + /// + private static void UpdateCacheWithDevices(CacheState cache, List deviceList, int groupCount) + { + foreach (var deviceId in deviceList) + { + // 原子操作:如果设备不存在则计算分组 + cache.BalancedMapping.GetOrAdd(deviceId, id => + { + int groupId = GetGroupId(id, groupCount); + lock (cache.CachedGroups[groupId]) + { + cache.CachedGroups[groupId].Add(id); + } + return groupId; + }); + } + } + + /// + /// 并行处理泛型数据集(支持动态线程分配) + /// + /// 已经分组的设备信息 + /// 部分或者全部的已经分组的设备集合 + /// 从泛型对象提取deviceId + /// 处理委托(参数:当前对象,线程ID) + /// 可选线程限制 + /// + /// + public static async Task ProcessGenericListAsync( + List items, Func deviceIdSelector, Action processor, int? maxThreads = null) + { + var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); + + // 创建分组任务队列 + var groupQueues = new ConcurrentQueue[cache.CachedGroups.Length]; + for (int i = 0; i < groupQueues.Length; i++) + { + groupQueues[i] = new ConcurrentQueue(); + } + + // 阶段1:分发数据到分组队列 + Parallel.ForEach(items, item => + { + var deviceId = deviceIdSelector(item); + if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId)) + { + groupQueues[groupId].Enqueue(item); + } + }); + + if ((maxThreads.HasValue && maxThreads.Value > cache.CachedGroups.Length) || maxThreads.HasValue == false) + { + maxThreads = cache.CachedGroups.Length; + } + + // 阶段2:并行处理队列 + var options = new ParallelOptions + { + MaxDegreeOfParallelism = maxThreads.Value, + }; + + TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); + await Task.Run(() => + { + Parallel.For(0, cache.CachedGroups.Length, options, async groupId => + { + var queue = groupQueues[groupId]; + while (queue.TryDequeue(out T item)) + { + await Task.Delay(timeSpan); + processor(item, Thread.CurrentThread.ManagedThreadId); + } + }); + }); + } + + + /// + /// 智能节流处理(CPU友好型) + /// + /// 已经分组的设备信息 + /// 部分或者全部的已经分组的设备集合 + /// 从泛型对象提取deviceId + /// 处理委托(参数:当前对象,线程ID) + /// 可选最佳并发度 + /// + /// + public static async Task ProcessWithThrottleAsync( + List items, + Func deviceIdSelector, + Action processor, + int? maxConcurrency = null) + { + var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); + + // 自动计算最佳并发度 + int recommendedThreads = CalculateOptimalThreadCount(); + if ((maxConcurrency.HasValue && maxConcurrency.Value > cache.CachedGroups.Length) || maxConcurrency.HasValue == false) + { + maxConcurrency = cache.CachedGroups.Length; + } + + int actualThreads = maxConcurrency ?? recommendedThreads; + + // 创建节流器 + using var throttler = new SemaphoreSlim(initialCount: actualThreads); + + // 使用LongRunning避免线程池饥饿 + var tasks = items.Select(async item => + { + await throttler.WaitAsync(); + try + { + var deviceId = deviceIdSelector(item); + if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId)) + { + // 分组级处理(保持顺序性) + await ProcessItemAsync(item, processor, groupId); + } + } + finally + { + throttler.Release(); + } + }); + + await Task.WhenAll(tasks); + } + + /// + /// 自动计算最优线程数 + /// + private static int CalculateOptimalThreadCount() + { + int coreCount = Environment.ProcessorCount; + return Math.Min( + coreCount * 2, // 超线程优化 + _currentCache?.CachedGroups.Length ?? 60 + ); + } + + /// + /// 分组异步处理(带节流) + /// + private static async Task ProcessItemAsync(T item, Action processor, int groupId) + { + // 使用内存缓存降低CPU负载 + await Task.Yield(); // 立即释放当前线程 + + // 分组处理上下文 + var context = ExecutionContext.Capture(); + ThreadPool.QueueUserWorkItem(_ => + { + ExecutionContext.Run(context!, state => + { + processor(item); + }, null); + }); + } + + + /// + /// 通过 deviceId 获取所在的分组集合 + /// + public static List GetGroup(string deviceId) + { + var cache = _currentCache; + if (cache == null) + throw new InvalidOperationException("缓存未初始化"); + + return cache.CachedGroups[cache.BalancedMapping[deviceId]]; + } + + /// + /// 通过 deviceId 获取分组Id + /// + public static int GetDeviceGroupId(string deviceId) + { + var cache = _currentCache; + if (cache == null) + throw new InvalidOperationException("缓存未初始化"); + + return cache.BalancedMapping[deviceId]; + } + + + /// + /// 创建均衡映射表 + /// + /// 数据集合 + /// 分组数量 + /// 允许的最大偏差百分比 + /// + public static Dictionary CreateBalancedMapping(List deviceList, int groupCount, int maxDeviation = 5) + { + var mapping = new Dictionary(); + int targetPerGroup = deviceList.Count / groupCount; + int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0)); + + // 初始化分组计数器 + int[] groupCounters = new int[groupCount]; + + foreach (var deviceId in deviceList) + { + int preferredGroup = GetGroupId(deviceId, groupCount); + + // 如果首选分组未满,直接分配 + if (groupCounters[preferredGroup] < maxAllowed) + { + mapping[deviceId] = preferredGroup; + groupCounters[preferredGroup]++; + } + else + { + // 寻找当前最空闲的分组 + int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min()); + mapping[deviceId] = fallbackGroup; + groupCounters[fallbackGroup]++; + } + } + + return mapping; + } + + /// + /// 分析分组分布 + /// + /// + /// + /// + public static Dictionary AnalyzeDistribution(List deviceList, int groupCount) + { + Dictionary distribution = new Dictionary(); + foreach (var deviceId in deviceList) + { + int groupId = GetGroupId(deviceId, groupCount); + distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1; + } + return distribution; + } + + /// + /// 获取设备ID对应的分组ID + /// + /// + /// + /// + public static int GetGroupId(string deviceId, int groupCount) + { + int hash = Fnv1aHash(deviceId); + // 双重取模确保分布均匀 + return (hash % groupCount + groupCount) % groupCount; + } + + /// + /// FNV-1a哈希算法 + /// + /// + /// + public static int Fnv1aHash(string input) + { + const uint fnvPrime = 16777619; + const uint fnvOffsetBasis = 2166136261; + + uint hash = fnvOffsetBasis; + foreach (char c in input) + { + hash ^= (byte)c; + hash *= fnvPrime; + } + return (int)hash; + } + + /// + /// CRC16算法实现 + /// + /// + /// + public static ushort CRC16Hash(byte[] bytes) + { + ushort crc = 0xFFFF; + for (int i = 0; i < bytes.Length; i++) + { + crc ^= bytes[i]; + for (int j = 0; j < 8; j++) + { + if ((crc & 0x0001) == 1) + { + crc = (ushort)((crc >> 1) ^ 0xA001); + } + else + { + crc >>= 1; + } + } + } + return crc; + } + + /// + /// 打印分组统计数据 + /// + public static void PrintDistributionStats() + { + var cache = _currentCache; + if (cache == null) + { + Console.WriteLine("缓存未初始化"); + return; + } + + var stats = cache.CachedGroups + .Select((group, idx) => new { GroupId = idx, Count = group.Count }) + .OrderBy(x => x.GroupId); + + Console.WriteLine("分组数据量统计:"); + foreach (var stat in stats) + { + Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据"); + } + } + + } +} diff --git a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 7734288..4e3fef9 100644 --- a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -181,5 +181,24 @@ namespace JiShe.CollectBus.Common.Extensions return $"{dateTime:yyyyMMddHH}"; #endif } + + /// + /// 获取当前时间毫秒级时间戳 + /// + /// + public static long GetCurrentTimeMillis() + { + return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + /// + /// 将Unix时间戳转换为日期时间 + /// + /// + /// + public static DateTime FromUnixMillis(long millis) + { + return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; + } } } diff --git a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs index 94e4ad1..80d2f23 100644 --- a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs @@ -64,5 +64,30 @@ namespace JiShe.CollectBus.Common.Extensions ? source.Where(predicate) : source; } + + /// + /// 分批 + /// + /// + /// + /// + /// + public static IEnumerable> Batch( + this IEnumerable source, + int batchSize) + { + var buffer = new List(batchSize); + foreach (var item in source) + { + buffer.Add(item); + if (buffer.Count == batchSize) + { + yield return buffer; + buffer = new List(batchSize); + } + } + if (buffer.Count > 0) + yield return buffer; + } } } diff --git a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs deleted file mode 100644 index c48bad3..0000000 --- a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs +++ /dev/null @@ -1,177 +0,0 @@ -using JiShe.CollectBus.FreeRedisProvider; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.DependencyInjection; - -namespace JiShe.CollectBus.Common.Helpers -{ - /// - /// 设备组负载控制 - /// - public class DeviceGroupBalanceControl - { - /// - /// 分组集合 - /// - private static List[] _cachedGroups; - - /// - /// 设备分组关系映射 - /// - private static Dictionary _balancedMapping; - - - /// - /// 初始化缓存并强制均衡 - /// - public static void InitializeCache(List deviceList,int groupCount = 50) - { - // 步骤1: 生成均衡映射表 - _balancedMapping = CreateBalancedMapping(deviceList, groupCount); - - // 步骤2: 根据映射表填充分组 - _cachedGroups = new List[groupCount]; - for (int i = 0; i < groupCount; i++) - { - _cachedGroups[i] = new List(capacity: deviceList.Count / groupCount + 1); - } - - foreach (var deviceId in deviceList) - { - int groupId = _balancedMapping[deviceId]; - _cachedGroups[groupId].Add(deviceId); - } - } - - /// - /// 通过 deviceId 获取所在的分组集合 - /// - public static List GetGroup(string deviceId) - { - if (_balancedMapping == null || _cachedGroups == null) - throw new InvalidOperationException("缓存未初始化"); - - int groupId = _balancedMapping[deviceId]; - return _cachedGroups[groupId]; - } - - /// - /// 通过 deviceId 获取分组Id - /// - public static int GetDeviceGroupId(string deviceId) - { - if (_balancedMapping == null || _cachedGroups == null) - throw new InvalidOperationException("缓存未初始化"); - - return _balancedMapping[deviceId]; - } - - - /// - /// 创建均衡映射表 - /// - /// 数据集合 - /// 分组数量 - /// 允许的最大偏差百分比 - /// - public static Dictionary CreateBalancedMapping(List deviceList, int groupCount, int maxDeviation = 5) - { - var mapping = new Dictionary(); - int targetPerGroup = deviceList.Count / groupCount; - int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0)); - - // 初始化分组计数器 - int[] groupCounters = new int[groupCount]; - - foreach (var deviceId in deviceList) - { - int preferredGroup = GetGroupId(deviceId, groupCount); - - // 如果首选分组未满,直接分配 - if (groupCounters[preferredGroup] < maxAllowed) - { - mapping[deviceId] = preferredGroup; - groupCounters[preferredGroup]++; - } - else - { - // 寻找当前最空闲的分组 - int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min()); - mapping[deviceId] = fallbackGroup; - groupCounters[fallbackGroup]++; - } - } - - return mapping; - } - - /// - /// 分析分组分布 - /// - /// - /// - /// - public static Dictionary AnalyzeDistribution(List deviceList, int groupCount) - { - Dictionary distribution = new Dictionary(); - foreach (var deviceId in deviceList) - { - int groupId = GetGroupId(deviceId, groupCount); - distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1; - } - return distribution; - } - - /// - /// 获取设备ID对应的分组ID - /// - /// - /// - /// - public static int GetGroupId(string deviceId, int groupCount) - { - int hash = Fnv1aHash(deviceId); - // 双重取模确保分布均匀 - return (hash % groupCount + groupCount) % groupCount; - } - - /// - /// FNV-1a哈希算法 - /// - /// - /// - public static int Fnv1aHash(string input) - { - const uint fnvPrime = 16777619; - const uint fnvOffsetBasis = 2166136261; - - uint hash = fnvOffsetBasis; - foreach (char c in input) - { - hash ^= (byte)c; - hash *= fnvPrime; - } - return (int)hash; - } - - /// - /// 打印分组统计数据 - /// - public static void PrintDistributionStats() - { - var stats = _cachedGroups - .Select((group, idx) => new { GroupId = idx, Count = group.Count }) - .OrderBy(x => x.GroupId); - - Console.WriteLine("分组数据量统计:"); - foreach (var stat in stats) - { - Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据"); - } - } - - } -} diff --git a/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusGlobalPagedResult.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusGlobalPagedResult.cs new file mode 100644 index 0000000..18cd089 --- /dev/null +++ b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusGlobalPagedResult.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.PagedResult +{ + public class GlobalPagedResult + { + /// + /// 数据集合 + /// + public List Items { get; set; } + + /// + /// 是否有下一页 + /// + public bool HasNext { get; set; } + + /// + /// 下一页的分页索引 + /// + public long? NextScore { get; set; } + + /// + /// 下一页的分页索引 + /// + public string NextMember { get; set; } + } +} diff --git a/src/JiShe.CollectBus.FreeRedisProvider/BusJsonSerializer.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusJsonSerializer.cs similarity index 100% rename from src/JiShe.CollectBus.FreeRedisProvider/BusJsonSerializer.cs rename to src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusJsonSerializer.cs diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusPagedResult.cs similarity index 55% rename from src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs rename to src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusPagedResult.cs index b707355..3ee9950 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusPagedResult.cs @@ -4,18 +4,28 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.IoTDBProvider +namespace JiShe.CollectBus.PagedResult { /// /// 查询结果 /// /// - public class PagedResult + public class BusPagedResult { /// /// 总条数 /// - public int TotalCount { get; set; } + public long TotalCount { get; set; } + + /// + /// 当前页码 + /// + public int PageIndex { get; set; } + + /// + /// 每页条数 + /// + public int PageSize { get; set; } /// /// 数据集合 diff --git a/src/JiShe.CollectBus.FreeRedisProvider/Extensions/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/DeviceCacheBasicModel.cs new file mode 100644 index 0000000..c561df2 --- /dev/null +++ b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/DeviceCacheBasicModel.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.FreeRedisProvider +{ + /// + /// 设备缓存基础模型 + /// + public class DeviceCacheBasicModel + { + /// + /// 集中器Id + /// + public int FocusId { get; set; } + + /// + /// 表Id + /// + public int MeterId { get; set; } + } +} diff --git a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs index 1a3b114..4bdbd6d 100644 --- a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs @@ -1,6 +1,7 @@ using FreeRedis; using JetBrains.Annotations; using JiShe.CollectBus.FreeRedisProvider.Options; +using JiShe.CollectBus.PagedResult; using JiShe.CollectBus.Serializer; using Microsoft.Extensions.Options; using System; @@ -31,7 +32,7 @@ namespace JiShe.CollectBus.FreeRedisProvider GetInstance(); } - public RedisClient Instance { get; set; } = new (string.Empty); + public RedisClient Instance { get; set; } = new(string.Empty); /// /// 获取 FreeRedis 客户端 @@ -39,7 +40,7 @@ namespace JiShe.CollectBus.FreeRedisProvider /// public IRedisClient GetInstance() { - + var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}"; Instance = new RedisClient(connectionString); Instance.Serialize = obj => BusJsonSerializer.Serialize(obj); @@ -47,5 +48,443 @@ namespace JiShe.CollectBus.FreeRedisProvider Instance.Notice += (s, e) => Trace.WriteLine(e.Log); return Instance; } + + + //public async Task AddMeterZSetCacheData(string redisCacheKey, string redisCacheIndexKey, decimal score, T data) + //{ + // if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + // { + // throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101"); + // } + + // // 生成唯一member标识 + // var member = data.Serialize(); + + // // 计算score范围 + // decimal dataScore = (long)score << 32; + + // //// 事务操作 + // //using (var tran = FreeRedisProvider.Instance.Multi()) + // //{ + // // await tran.ZAddAsync(cacheKey, score,member); + // // await tran.SAddAsync($"cat_index:{categoryId}", member); + // // object[] ret = tran.Exec(); + // //} + + // using (var pipe = Instance.StartPipe()) + // { + // pipe.ZAdd(redisCacheKey, dataScore, member); + // pipe.SAdd(redisCacheIndexKey, member); + // object[] ret = pipe.EndPipe(); + // } + + // await Task.CompletedTask; + //} + + //public async Task> GetMeterZSetPagedData( + //string redisCacheKey, + //string redisCacheIndexKey, + //decimal score, + //int pageSize = 10, + //int pageIndex = 1) + //{ + // if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + // { + // throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101"); + // } + + // // 计算score范围 + // decimal minScore = (long)score << 32; + // decimal maxScore = ((long)score + 1) << 32; + + // // 分页参数 + // int start = (pageIndex - 1) * pageSize; + + // // 查询主数据 + // var members = await Instance.ZRevRangeByScoreAsync( + // redisCacheKey, + // maxScore, + // minScore, + // offset: start, + // count: pageSize + // ); + + // if (members == null) + // { + // throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102"); + // } + + // // 查询总数 + // var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore); + + // return new BusPagedResult + // { + // Items = members.Select(m => + // BusJsonSerializer.Deserialize(m)!).ToList(), + // TotalCount = total, + // PageIndex = pageIndex, + // PageSize = pageSize + // }; + //} + + ///// + ///// 删除数据示例 + ///// + ///// + ///// 分类 + ///// + ///// + ///// + //public async Task RemoveMeterZSetData( + //string redisCacheKey, + //string redisCacheIndexKey, + //T data) + //{ + + // // 查询需要删除的member + // var members = await Instance.SMembersAsync(redisCacheIndexKey); + // var target = members.FirstOrDefault(m => + // BusJsonSerializer.Deserialize(m) == data);//泛型此处该如何处理? + + // if (target != null) + // { + // using (var trans = Instance.Multi()) + // { + // trans.ZRem(redisCacheKey, target); + // trans.SRem(redisCacheIndexKey, target); + // trans.Exec(); + // } + // } + + // await Task.CompletedTask; + //} + + + public async Task AddMeterZSetCacheData( + string redisCacheKey, + string redisCacheIndexKey, + int categoryId, // 新增分类ID参数 + T data, + DateTimeOffset? timestamp = null) + { + // 参数校验增强 + if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) + || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + { + throw new ArgumentException("Invalid parameters"); + } + + // 生成唯一member标识(带数据指纹) + var member = $"{categoryId}:{Guid.NewGuid()}"; + var serializedData = data.Serialize(); + + // 计算组合score(分类ID + 时间戳) + var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; + + long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks; + + //全局索引写入 + long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); + + // 使用事务保证原子性 + using (var trans = Instance.Multi()) + { + // 主数据存储Hash + trans.HSet(redisCacheKey, member, serializedData); + + // 排序索引使用ZSET + trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member); + + // 分类索引 + trans.SAdd(redisCacheIndexKey, member); + + //全局索引 + trans.ZAdd("global_data_all", globalScore, member); + + var results = trans.Exec(); + + if (results == null || results.Length <= 0) + throw new Exception("Transaction failed"); + } + + await Task.CompletedTask; + } + + public async Task BatchAddMeterData( + string redisCacheKey, + string indexKey, + IEnumerable items) where T : DeviceCacheBasicModel + { + const int BATCH_SIZE = 1000; // 每批1000条 + var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); + + //foreach (var batch in items.Batch(BATCH_SIZE)) + //{ + // await semaphore.WaitAsync(); + + // _ = Task.Run(async () => + // { + // using (var pipe = FreeRedisProvider.Instance.StartPipe()) + // { + // foreach (var item in batch) + // { + // var member = $"{item.CategoryId}:{Guid.NewGuid()}"; + // long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks; + + // // Hash主数据 + // pipe.HSet(redisCacheKey, member, item.Data.Serialize()); + + // // 分类索引 + // pipe.ZAdd($"{redisCacheKey}_scores", score, member); + + // // 全局索引 + // pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member); + + // // 分类快速索引 + // pipe.SAdd(indexKey, member); + // } + // pipe.EndPipe(); + // } + // semaphore.Release(); + // }); + //} + + await Task.CompletedTask; + } + + public async Task UpdateMeterData( + string redisCacheKey, + string oldCategoryIndexKey, + string newCategoryIndexKey, + string memberId, // 唯一标识(格式:"分类ID:GUID") + T newData, + int? newCategoryId = null, + DateTimeOffset? newTimestamp = null) + { + // 参数校验 + if (string.IsNullOrWhiteSpace(memberId)) + throw new ArgumentException("Invalid member ID"); + + var luaScript = @" + local mainKey = KEYS[1] + local scoreKey = KEYS[2] + local oldIndex = KEYS[3] + local newIndex = KEYS[4] + local member = ARGV[1] + local newData = ARGV[2] + local newScore = ARGV[3] + + -- 校验旧数据是否存在 + if redis.call('HEXISTS', mainKey, member) == 0 then + return 0 + end + + -- 更新主数据 + redis.call('HSET', mainKey, member, newData) + + -- 处理分类变更 + if newScore ~= '' then + -- 删除旧索引 + redis.call('SREM', oldIndex, member) + -- 更新排序分数 + redis.call('ZADD', scoreKey, newScore, member) + -- 添加新索引 + redis.call('SADD', newIndex, member) + end + + return 1 + "; + + // 计算新score(当分类或时间变化时) + long? newScoreValue = null; + if (newCategoryId.HasValue || newTimestamp.HasValue) + { + var parts = memberId.Split(':'); + var oldCategoryId = int.Parse(parts[0]); + + var actualCategoryId = newCategoryId ?? oldCategoryId; + var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; + + newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks; + } + + var result = await Instance.EvalAsync(luaScript, + new[] + { + redisCacheKey, + $"{redisCacheKey}_scores", + oldCategoryIndexKey, + newCategoryIndexKey + }, + new[] + { + memberId, + newData.Serialize(), + newScoreValue?.ToString() ?? "" + }); + + // 如果时间戳变化则更新全局索引 + if (newTimestamp.HasValue) + { + long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds(); + await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId); + } + + if ((int)result == 0) + throw new KeyNotFoundException("指定数据不存在"); + } + + + public async Task> GetMeterZSetPagedData( + string redisCacheKey, + string redisCacheIndexKey, + int categoryId, + int pageSize = 10, + int pageIndex = 1, + bool descending = true) + { + // 计算score范围 + long minScore = (long)categoryId << 32; + long maxScore = ((long)categoryId + 1) << 32; + + // 分页参数计算 + int start = (pageIndex - 1) * pageSize; + + // 获取排序后的member列表 + var members = descending + ? await Instance.ZRevRangeByScoreAsync( + $"{redisCacheKey}_scores", + maxScore, + minScore, + start, + pageSize) + : await Instance.ZRangeByScoreAsync( + $"{redisCacheKey}_scores", + minScore, + maxScore, + start, + pageSize); + + // 批量获取实际数据 + var dataTasks = members.Select(m => + Instance.HGetAsync(redisCacheKey, m)).ToArray(); + await Task.WhenAll(dataTasks); + + // 总数统计优化 + var total = await Instance.ZCountAsync( + $"{redisCacheKey}_scores", + minScore, + maxScore); + + return new BusPagedResult + { + Items = dataTasks.Select(t => t.Result).ToList(), + TotalCount = total, + PageIndex = pageIndex, + PageSize = pageSize + }; + } + + + public async Task RemoveMeterZSetData( + string redisCacheKey, + string redisCacheIndexKey, + string uniqueId) // 改为基于唯一标识删除 + { + // 原子操作 + var luaScript = @" + local mainKey = KEYS[1] + local scoreKey = KEYS[2] + local indexKey = KEYS[3] + local member = ARGV[1] + + redis.call('HDEL', mainKey, member) + redis.call('ZREM', scoreKey, member) + redis.call('SREM', indexKey, member) + return 1 + "; + + var keys = new[] + { + redisCacheKey, + $"{redisCacheKey}_scores", + redisCacheIndexKey + }; + + var result = await Instance.EvalAsync(luaScript, + keys, + new[] { uniqueId }); + + if ((int)result != 1) + throw new Exception("删除操作失败"); + } + + public async Task> GetGlobalPagedData( + string redisCacheKey, + int pageSize = 10, + long? lastScore = null, + string lastMember = null, + bool descending = true) + { + const string zsetKey = "global_data_all"; + + // 分页参数处理 + var (startScore, excludeMember) = descending + ? (lastScore ?? long.MaxValue, lastMember) + : (lastScore ?? 0, lastMember); + + // 获取成员列表 + string[] members; + if (descending) + { + members = await Instance.ZRevRangeByScoreAsync( + zsetKey, + max: startScore, + min: 0, + offset: 0, + count: pageSize + 1); + } + else + { + members = await Instance.ZRangeByScoreAsync( + zsetKey, + min: startScore, + max: long.MaxValue, + offset: 0, + count: pageSize + 1); + } + + // 处理分页结果 + bool hasNext = members.Length > pageSize; + var actualMembers = members.Take(pageSize).ToArray(); + + // 批量获取数据(优化版本) + var dataTasks = actualMembers + .Select(m => Instance.HGetAsync(redisCacheKey, m)) + .ToArray(); + await Task.WhenAll(dataTasks); + + // 获取下一页游标 + (long? nextScore, string nextMember) = actualMembers.Any() + ? await GetNextCursor(zsetKey, actualMembers.Last(), descending) + : (null, null); + + return new GlobalPagedResult + { + Items = dataTasks.Select(t => t.Result).ToList(), + HasNext = hasNext, + NextScore = nextScore, + NextMember = nextMember + }; + } + + private async Task<(long? score, string member)> GetNextCursor( + string zsetKey, + string lastMember, + bool descending) + { + var score = await Instance.ZScoreAsync(zsetKey, lastMember); + return (score.HasValue ? (long)score.Value : null, lastMember); + } } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs index 0aad03e..292549a 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs @@ -1,4 +1,5 @@ -using System; +using JiShe.CollectBus.PagedResult; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -48,6 +49,6 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// /// - Task> QueryAsync(QueryOptions options) where T : IoTEntity, new(); + Task> QueryAsync(QueryOptions options) where T : IoTEntity, new(); } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs index 705b2c1..6ae0ef1 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs @@ -1,22 +1,14 @@ using Apache.IoTDB; using Apache.IoTDB.DataStructure; using JiShe.CollectBus.Common.Extensions; -using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IoTDBProvider.Context; using JiShe.CollectBus.IoTDBProvider.Interface; -using JiShe.CollectBus.IoTDBProvider.Provider; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; using System.Reflection; using System.Text; -using System.Threading.Tasks; -using static Thrift.Protocol.Utilities.TJSONProtocolConstants; +using JiShe.CollectBus.PagedResult; + namespace JiShe.CollectBus.IoTDBProvider { @@ -118,12 +110,12 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// /// - public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() + public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuerySQL(options); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - var result = new PagedResult + var result = new BusPagedResult { TotalCount = await GetTotalCount(options), Items = ParseResults(sessionDataSet, options.PageSize) diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs index 74bef5f..26d028e 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs @@ -70,6 +70,30 @@ namespace JiShe.CollectBus.Kafka.AdminClient return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } + /// + /// 判断Kafka主题是否存在 + /// + /// 主题名称 + /// 副本数量,不能高于Brokers数量 + /// + public async Task CheckTopicAsync(string topic,int numPartitions) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); + if(numPartitions > metadata.Brokers.Count) + { + throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ; + } + + return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); + } + + //// + /// 创建Kafka主题 + /// + /// 主题名称 + /// 主题分区数量 + /// 副本数量,不能高于Brokers数量 + /// public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) { @@ -97,17 +121,31 @@ namespace JiShe.CollectBus.Kafka.AdminClient } } + /// + /// 删除Kafka主题 + /// + /// + /// public async Task DeleteTopicAsync(string topic) { await Instance.DeleteTopicsAsync(new[] { topic }); } + /// + /// 获取Kafka主题列表 + /// + /// public async Task> ListTopicsAsync() { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return new List(metadata.Topics.Select(t => t.Topic)); } + /// + /// 判断Kafka主题是否存在 + /// + /// + /// public async Task TopicExistsAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs index 00e51b3..238a130 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs @@ -8,9 +8,33 @@ namespace JiShe.CollectBus.Kafka.AdminClient { public interface IAdminClientService { + /// + /// 创建Kafka主题 + /// + /// 主题名称 + /// 主题分区数量 + /// 副本数量,不能高于Brokers数量 + /// Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor); + + /// + /// 删除Kafka主题 + /// + /// + /// Task DeleteTopicAsync(string topic); + + /// + /// 获取Kafka主题列表 + /// + /// Task> ListTopicsAsync(); + + /// + /// 判断Kafka主题是否存在 + /// + /// + /// Task TopicExistsAsync(string topic); /// diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs index 0812aae..fc52f31 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs @@ -49,7 +49,6 @@ namespace JiShe.CollectBus.Protocol.Contracts //动态上报主题,需根据协议的AFN功能码动态获取 var afnList = EnumExtensions.ToNameValueDictionary(); - //需要排除的AFN功能码 var excludeItems = new List() { 6, 7, 8,15 }; diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs index e7caa1c..d1e5739 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs @@ -70,22 +70,9 @@ namespace JiShe.CollectBus.Protocol.Contracts #region 水表消息主题 /// - /// 1分钟采集水表数据下行消息主题 + /// 水表数据下行消息主题,由于水表采集频率不高,所以一个主题就够 /// - public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.auto.one.watermeter.event"; - /// - /// 5分钟采集水表数据下行消息主题 - /// - public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.auto.five.watermeter.event"; - /// - /// 15分钟采集水表数据下行消息主题 - /// - public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.auto.fifteen.watermeter.event"; - - /// - /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号等 - /// - public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.auto.other.watermeter.event"; + public const string WatermeterSubscriberWorkerAutoReadingIssuedEventName = "issued.auto.reading.watermeter.event"; /// /// 水表自动阀控