diff --git a/src/JiShe.CollectBus.Application/CollectBusAppService.cs b/src/JiShe.CollectBus.Application/CollectBusAppService.cs index f84d82f..e1f913e 100644 --- a/src/JiShe.CollectBus.Application/CollectBusAppService.cs +++ b/src/JiShe.CollectBus.Application/CollectBusAppService.cs @@ -1,15 +1,20 @@ -using FreeRedis; +using Confluent.Kafka; +using FreeRedis; using FreeSql; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.FreeSql; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Localization; using JiShe.CollectBus.Serializer; using Microsoft.AspNetCore.Mvc; using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Volo.Abp.Application.Services; @@ -21,6 +26,7 @@ public abstract class CollectBusAppService : ApplicationService public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService(); protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService()!; + protected CollectBusAppService() { LocalizationResource = typeof(CollectBusResource); @@ -220,5 +226,5 @@ public abstract class CollectBusAppService : ApplicationService } return meterInfos; - } -} + } +} diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 914bfa1..2e5e1fb 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -14,6 +14,7 @@ using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Serializer; @@ -36,18 +37,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly ICapPublisher _producerBus; private readonly IIoTDBProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; + private readonly IProducerService _producerService; public BasicScheduledMeterReadingService( ILogger logger, ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordRepository, + IProducerService producerService, IIoTDBProvider dbProvider) { _producerBus = producerBus; _logger = logger; _dbProvider = dbProvider; _meterReadingRecordRepository = meterReadingRecordRepository; + _producerService = producerService; } /// @@ -144,38 +148,27 @@ namespace JiShe.CollectBus.ScheduledMeterReading var timer = Stopwatch.StartNew(); - //处理数据 - List>> tempDatas = new List>>(); - await DeviceGroupBalanceControl.ProcessGenericListAsync( + //处理数据 + //await DeviceGroupBalanceControl.ProcessGenericListAsync( + // items: meterInfos, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, threadId) => + // { + // _ = AmmerterCreatePublishTask(timeDensity, data); + // } + //); + + + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, - processor: (data, threadId) => + processor: data => { _ = AmmerterCreatePublishTask(timeDensity, data); - //var keyValuePairs = AmmerterCreatePublishTask(timeDensity, data); - //if (keyValuePairs != null && keyValuePairs.Keys.Count() > 0) - //{ - // //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 - // var redisDataCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{keyValuePairs.First().Value.FocusAddress}"; - // tempDatas.Add(Tuple.Create(redisDataCacheKey, keyValuePairs)); - // //tempDatas.Add(keyValuePairs); - //} } ); - //_logger.LogError("数据处理完成。"); - - //using (var pipe = FreeRedisProvider.Instance.StartPipe()) - //{ - // _logger.LogError("开始进入管道处理。"); - // foreach (var dataItem in tempDatas) - // { - // pipe.HSet(dataItem.Item1, dataItem.Item2); - // } - // object[] ret = pipe.EndPipe(); - //} - //_logger.LogError("管道处理完成。"); - timer.Stop(); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); @@ -533,12 +526,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); + stopwatch.Stop(); @@ -635,20 +623,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); return; } //载波的不处理 if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); return; } if (ammeterInfo.State.Equals(2)) { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); + //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); return; } @@ -661,22 +649,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); return; } if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); return; } if (Convert.ToInt32(ammeterInfo.Address) > 65535) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); return; } if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); + //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); return; } @@ -703,7 +691,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (tempSubCodes == null || tempSubCodes.Count <= 0) { - _logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); + //_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); return; } else @@ -753,7 +741,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); + //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); continue; } } @@ -761,7 +749,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (dataInfos == null || dataInfos.Length <= 0) { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); + //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); continue; } @@ -802,6 +790,78 @@ namespace JiShe.CollectBus.ScheduledMeterReading pipe.HSet(redisCacheKey, keyValuePairs); object[] ret = pipe.EndPipe(); } + + + await Task.CompletedTask; + } + + /// + /// Kafka 推送消息 + /// + /// 主题名称 + /// 任务记录 + /// + private async Task KafkaProducerIssuedMessage(string topicName, + MeterReadingRecords taskRecord) + { + if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) + { + throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); + } + int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); + TopicPartition topicPartition = new TopicPartition(topicName, partition); + await _producerService.ProduceAsync(topicPartition, null, taskRecord); + } + + private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) + { + var currentDateTime = DateTime.Now; + + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType); + + //FreeRedisProvider.Instance.key() + + var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); + return; + } + + //获取下发任务缓存数据 + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType); + if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); + return; + } + + List meterTaskInfosList = new List(); + + //将取出的缓存任务数据发送到Kafka消息队列中 + foreach (var focusItem in meterTaskInfos) + { + foreach (var ammerterItem in focusItem.Value) + { + var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + { + MessageHexString = ammerterItem.Value.IssuedMessageHexString, + MessageId = ammerterItem.Value.IssuedMessageId, + FocusAddress = ammerterItem.Value.FocusAddress, + TimeDensity = timeDensity.ToString(), + }; + + _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + + //_ = _producerBus.Publish(tempMsg); + + meterTaskInfosList.Add(ammerterItem.Value); + } + } + if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) + { + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); + } } /// diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index cba4f41..32a3b9e 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Confluent.Kafka; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; @@ -13,6 +14,7 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; using MassTransit; @@ -34,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration) : base(logger, producerBus, meterReadingRecordRepository, dbProvider) + ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) { serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; } diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index e9ac9bc..2575675 100644 --- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -40,8 +40,18 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl /// /// 初始化或增量更新缓存 /// - public static void InitializeCache(List deviceList, int groupCount = 60) + public static void InitializeCache(List deviceList, int groupCount = 30) { + if (deviceList == null || deviceList.Count <= 0) + { + throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化失败,设备数据为空"); + } + + if (groupCount > 60 || groupCount <= 0) + { + groupCount = 60; + } + lock (_syncRoot) { // 首次初始化 @@ -55,7 +65,9 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl else { if (_currentCache.CachedGroups.Length != groupCount) - throw new ArgumentException("Group count cannot change after initial initialization"); + { + throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化完成以后,分组数量不能更改"); + } var clonedCache = CloneExistingCache(); UpdateCacheWithDevices(clonedCache, deviceList, groupCount); @@ -165,49 +177,90 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl }); } + /// - /// 并行处理所有分组设备(每个分组一个处理线程) + /// 智能节流处理(CPU友好型) /// - //public static void ProcessAllGroups(Action> processAction) where T : DeviceGroupBasicModel - //{ - // var cache = _currentCache; - // if (cache == null) - // throw new InvalidOperationException("缓存未初始化"); + /// 已经分组的设备信息 + /// 部分或者全部的已经分组的设备集合 + /// 从泛型对象提取deviceId + /// 处理委托(参数:当前对象,线程ID) + /// 可选最佳并发度 + /// + /// + public static async Task ProcessWithThrottleAsync( + List items, + Func deviceIdSelector, + Action processor, + int? maxConcurrency = null) + { + var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); - // // 使用并行选项控制并发度 - // var options = new ParallelOptions - // { - // MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量 - // }; + // 自动计算最佳并发度 + int recommendedThreads = CalculateOptimalThreadCount(); + if ((maxConcurrency.HasValue && maxConcurrency.Value > cache.CachedGroups.Length) || maxConcurrency.HasValue == false) + { + maxConcurrency = cache.CachedGroups.Length; + } - // Parallel.For(0, cache.CachedGroups.Length, options, groupId => - // { - // // 获取当前分组的只读副本 - // var groupDevices = GetGroupSnapshot(cache, groupId); + int actualThreads = maxConcurrency ?? recommendedThreads; - // processAction(groupDevices); + // 创建节流器 + using var throttler = new SemaphoreSlim(initialCount: actualThreads); - // //foreach (var deviceId in groupDevices) - // //{ - // // //执行处理操作 - // // processAction(deviceId); + // 使用LongRunning避免线程池饥饿 + var tasks = items.Select(async item => + { + await throttler.WaitAsync(); + try + { + var deviceId = deviceIdSelector(item); + if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId)) + { + // 分组级处理(保持顺序性) + await ProcessItemAsync(item, processor, groupId); + } + } + finally + { + throttler.Release(); + } + }); - // // // 可添加取消检测 - // // // if (token.IsCancellationRequested) break; - // //} - // }); - //} + await Task.WhenAll(tasks); + } + + /// + /// 自动计算最优线程数 + /// + private static int CalculateOptimalThreadCount() + { + int coreCount = Environment.ProcessorCount; + return Math.Min( + coreCount * 2, // 超线程优化 + _currentCache?.CachedGroups.Length ?? 60 + ); + } + + /// + /// 分组异步处理(带节流) + /// + private static async Task ProcessItemAsync(T item, Action processor, int groupId) + { + // 使用内存缓存降低CPU负载 + await Task.Yield(); // 立即释放当前线程 + + // 分组处理上下文 + var context = ExecutionContext.Capture(); + ThreadPool.QueueUserWorkItem(_ => + { + ExecutionContext.Run(context!, state => + { + processor(item); + }, null); + }); + } - ///// - ///// 获取分组数据快照(线程安全) - ///// - //public static IReadOnlyList GetGroupSnapshot(CacheState cache, int groupId) - //{ - // lock (cache.CachedGroups[groupId]) - // { - // return cache.CachedGroups[groupId].ToList(); // 创建内存快照 - // } - //} /// /// 通过 deviceId 获取所在的分组集合 @@ -321,6 +374,32 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl return (int)hash; } + /// + /// CRC16算法实现 + /// + /// + /// + public static ushort CRC16Hash(byte[] bytes) + { + ushort crc = 0xFFFF; + for (int i = 0; i < bytes.Length; i++) + { + crc ^= bytes[i]; + for (int j = 0; j < 8; j++) + { + if ((crc & 0x0001) == 1) + { + crc = (ushort)((crc >> 1) ^ 0xA001); + } + else + { + crc >>= 1; + } + } + } + return crc; + } + /// /// 打印分组统计数据 /// diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBasicModel.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBasicModel.cs deleted file mode 100644 index f12f15e..0000000 --- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBasicModel.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Common.DeviceBalanceControl -{ - /// - /// 设备组基本模型 - /// - public class DeviceGroupBasicModel - { - /// - /// 设备Id - /// - public string DeviceId { get; set; } - } -} diff --git a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 7734288..4e3fef9 100644 --- a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -181,5 +181,24 @@ namespace JiShe.CollectBus.Common.Extensions return $"{dateTime:yyyyMMddHH}"; #endif } + + /// + /// 获取当前时间毫秒级时间戳 + /// + /// + public static long GetCurrentTimeMillis() + { + return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + /// + /// 将Unix时间戳转换为日期时间 + /// + /// + /// + public static DateTime FromUnixMillis(long millis) + { + return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; + } } } diff --git a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs index 94e4ad1..80d2f23 100644 --- a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs @@ -64,5 +64,30 @@ namespace JiShe.CollectBus.Common.Extensions ? source.Where(predicate) : source; } + + /// + /// 分批 + /// + /// + /// + /// + /// + public static IEnumerable> Batch( + this IEnumerable source, + int batchSize) + { + var buffer = new List(batchSize); + foreach (var item in source) + { + buffer.Add(item); + if (buffer.Count == batchSize) + { + yield return buffer; + buffer = new List(batchSize); + } + } + if (buffer.Count > 0) + yield return buffer; + } } } diff --git a/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusGlobalPagedResult.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusGlobalPagedResult.cs new file mode 100644 index 0000000..18cd089 --- /dev/null +++ b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusGlobalPagedResult.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.PagedResult +{ + public class GlobalPagedResult + { + /// + /// 数据集合 + /// + public List Items { get; set; } + + /// + /// 是否有下一页 + /// + public bool HasNext { get; set; } + + /// + /// 下一页的分页索引 + /// + public long? NextScore { get; set; } + + /// + /// 下一页的分页索引 + /// + public string NextMember { get; set; } + } +} diff --git a/src/JiShe.CollectBus.FreeRedisProvider/BusJsonSerializer.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusJsonSerializer.cs similarity index 100% rename from src/JiShe.CollectBus.FreeRedisProvider/BusJsonSerializer.cs rename to src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusJsonSerializer.cs diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusPagedResult.cs similarity index 55% rename from src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs rename to src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusPagedResult.cs index b707355..3ee9950 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/BusPagedResult.cs @@ -4,18 +4,28 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.IoTDBProvider +namespace JiShe.CollectBus.PagedResult { /// /// 查询结果 /// /// - public class PagedResult + public class BusPagedResult { /// /// 总条数 /// - public int TotalCount { get; set; } + public long TotalCount { get; set; } + + /// + /// 当前页码 + /// + public int PageIndex { get; set; } + + /// + /// 每页条数 + /// + public int PageSize { get; set; } /// /// 数据集合 diff --git a/src/JiShe.CollectBus.FreeRedisProvider/Extensions/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/DeviceCacheBasicModel.cs new file mode 100644 index 0000000..c561df2 --- /dev/null +++ b/src/JiShe.CollectBus.FreeRedisProvider/Extensions/DeviceCacheBasicModel.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.FreeRedisProvider +{ + /// + /// 设备缓存基础模型 + /// + public class DeviceCacheBasicModel + { + /// + /// 集中器Id + /// + public int FocusId { get; set; } + + /// + /// 表Id + /// + public int MeterId { get; set; } + } +} diff --git a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs index 1a3b114..4bdbd6d 100644 --- a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs @@ -1,6 +1,7 @@ using FreeRedis; using JetBrains.Annotations; using JiShe.CollectBus.FreeRedisProvider.Options; +using JiShe.CollectBus.PagedResult; using JiShe.CollectBus.Serializer; using Microsoft.Extensions.Options; using System; @@ -31,7 +32,7 @@ namespace JiShe.CollectBus.FreeRedisProvider GetInstance(); } - public RedisClient Instance { get; set; } = new (string.Empty); + public RedisClient Instance { get; set; } = new(string.Empty); /// /// 获取 FreeRedis 客户端 @@ -39,7 +40,7 @@ namespace JiShe.CollectBus.FreeRedisProvider /// public IRedisClient GetInstance() { - + var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}"; Instance = new RedisClient(connectionString); Instance.Serialize = obj => BusJsonSerializer.Serialize(obj); @@ -47,5 +48,443 @@ namespace JiShe.CollectBus.FreeRedisProvider Instance.Notice += (s, e) => Trace.WriteLine(e.Log); return Instance; } + + + //public async Task AddMeterZSetCacheData(string redisCacheKey, string redisCacheIndexKey, decimal score, T data) + //{ + // if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + // { + // throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101"); + // } + + // // 生成唯一member标识 + // var member = data.Serialize(); + + // // 计算score范围 + // decimal dataScore = (long)score << 32; + + // //// 事务操作 + // //using (var tran = FreeRedisProvider.Instance.Multi()) + // //{ + // // await tran.ZAddAsync(cacheKey, score,member); + // // await tran.SAddAsync($"cat_index:{categoryId}", member); + // // object[] ret = tran.Exec(); + // //} + + // using (var pipe = Instance.StartPipe()) + // { + // pipe.ZAdd(redisCacheKey, dataScore, member); + // pipe.SAdd(redisCacheIndexKey, member); + // object[] ret = pipe.EndPipe(); + // } + + // await Task.CompletedTask; + //} + + //public async Task> GetMeterZSetPagedData( + //string redisCacheKey, + //string redisCacheIndexKey, + //decimal score, + //int pageSize = 10, + //int pageIndex = 1) + //{ + // if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + // { + // throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101"); + // } + + // // 计算score范围 + // decimal minScore = (long)score << 32; + // decimal maxScore = ((long)score + 1) << 32; + + // // 分页参数 + // int start = (pageIndex - 1) * pageSize; + + // // 查询主数据 + // var members = await Instance.ZRevRangeByScoreAsync( + // redisCacheKey, + // maxScore, + // minScore, + // offset: start, + // count: pageSize + // ); + + // if (members == null) + // { + // throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102"); + // } + + // // 查询总数 + // var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore); + + // return new BusPagedResult + // { + // Items = members.Select(m => + // BusJsonSerializer.Deserialize(m)!).ToList(), + // TotalCount = total, + // PageIndex = pageIndex, + // PageSize = pageSize + // }; + //} + + ///// + ///// 删除数据示例 + ///// + ///// + ///// 分类 + ///// + ///// + ///// + //public async Task RemoveMeterZSetData( + //string redisCacheKey, + //string redisCacheIndexKey, + //T data) + //{ + + // // 查询需要删除的member + // var members = await Instance.SMembersAsync(redisCacheIndexKey); + // var target = members.FirstOrDefault(m => + // BusJsonSerializer.Deserialize(m) == data);//泛型此处该如何处理? + + // if (target != null) + // { + // using (var trans = Instance.Multi()) + // { + // trans.ZRem(redisCacheKey, target); + // trans.SRem(redisCacheIndexKey, target); + // trans.Exec(); + // } + // } + + // await Task.CompletedTask; + //} + + + public async Task AddMeterZSetCacheData( + string redisCacheKey, + string redisCacheIndexKey, + int categoryId, // 新增分类ID参数 + T data, + DateTimeOffset? timestamp = null) + { + // 参数校验增强 + if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) + || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + { + throw new ArgumentException("Invalid parameters"); + } + + // 生成唯一member标识(带数据指纹) + var member = $"{categoryId}:{Guid.NewGuid()}"; + var serializedData = data.Serialize(); + + // 计算组合score(分类ID + 时间戳) + var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; + + long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks; + + //全局索引写入 + long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); + + // 使用事务保证原子性 + using (var trans = Instance.Multi()) + { + // 主数据存储Hash + trans.HSet(redisCacheKey, member, serializedData); + + // 排序索引使用ZSET + trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member); + + // 分类索引 + trans.SAdd(redisCacheIndexKey, member); + + //全局索引 + trans.ZAdd("global_data_all", globalScore, member); + + var results = trans.Exec(); + + if (results == null || results.Length <= 0) + throw new Exception("Transaction failed"); + } + + await Task.CompletedTask; + } + + public async Task BatchAddMeterData( + string redisCacheKey, + string indexKey, + IEnumerable items) where T : DeviceCacheBasicModel + { + const int BATCH_SIZE = 1000; // 每批1000条 + var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); + + //foreach (var batch in items.Batch(BATCH_SIZE)) + //{ + // await semaphore.WaitAsync(); + + // _ = Task.Run(async () => + // { + // using (var pipe = FreeRedisProvider.Instance.StartPipe()) + // { + // foreach (var item in batch) + // { + // var member = $"{item.CategoryId}:{Guid.NewGuid()}"; + // long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks; + + // // Hash主数据 + // pipe.HSet(redisCacheKey, member, item.Data.Serialize()); + + // // 分类索引 + // pipe.ZAdd($"{redisCacheKey}_scores", score, member); + + // // 全局索引 + // pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member); + + // // 分类快速索引 + // pipe.SAdd(indexKey, member); + // } + // pipe.EndPipe(); + // } + // semaphore.Release(); + // }); + //} + + await Task.CompletedTask; + } + + public async Task UpdateMeterData( + string redisCacheKey, + string oldCategoryIndexKey, + string newCategoryIndexKey, + string memberId, // 唯一标识(格式:"分类ID:GUID") + T newData, + int? newCategoryId = null, + DateTimeOffset? newTimestamp = null) + { + // 参数校验 + if (string.IsNullOrWhiteSpace(memberId)) + throw new ArgumentException("Invalid member ID"); + + var luaScript = @" + local mainKey = KEYS[1] + local scoreKey = KEYS[2] + local oldIndex = KEYS[3] + local newIndex = KEYS[4] + local member = ARGV[1] + local newData = ARGV[2] + local newScore = ARGV[3] + + -- 校验旧数据是否存在 + if redis.call('HEXISTS', mainKey, member) == 0 then + return 0 + end + + -- 更新主数据 + redis.call('HSET', mainKey, member, newData) + + -- 处理分类变更 + if newScore ~= '' then + -- 删除旧索引 + redis.call('SREM', oldIndex, member) + -- 更新排序分数 + redis.call('ZADD', scoreKey, newScore, member) + -- 添加新索引 + redis.call('SADD', newIndex, member) + end + + return 1 + "; + + // 计算新score(当分类或时间变化时) + long? newScoreValue = null; + if (newCategoryId.HasValue || newTimestamp.HasValue) + { + var parts = memberId.Split(':'); + var oldCategoryId = int.Parse(parts[0]); + + var actualCategoryId = newCategoryId ?? oldCategoryId; + var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; + + newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks; + } + + var result = await Instance.EvalAsync(luaScript, + new[] + { + redisCacheKey, + $"{redisCacheKey}_scores", + oldCategoryIndexKey, + newCategoryIndexKey + }, + new[] + { + memberId, + newData.Serialize(), + newScoreValue?.ToString() ?? "" + }); + + // 如果时间戳变化则更新全局索引 + if (newTimestamp.HasValue) + { + long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds(); + await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId); + } + + if ((int)result == 0) + throw new KeyNotFoundException("指定数据不存在"); + } + + + public async Task> GetMeterZSetPagedData( + string redisCacheKey, + string redisCacheIndexKey, + int categoryId, + int pageSize = 10, + int pageIndex = 1, + bool descending = true) + { + // 计算score范围 + long minScore = (long)categoryId << 32; + long maxScore = ((long)categoryId + 1) << 32; + + // 分页参数计算 + int start = (pageIndex - 1) * pageSize; + + // 获取排序后的member列表 + var members = descending + ? await Instance.ZRevRangeByScoreAsync( + $"{redisCacheKey}_scores", + maxScore, + minScore, + start, + pageSize) + : await Instance.ZRangeByScoreAsync( + $"{redisCacheKey}_scores", + minScore, + maxScore, + start, + pageSize); + + // 批量获取实际数据 + var dataTasks = members.Select(m => + Instance.HGetAsync(redisCacheKey, m)).ToArray(); + await Task.WhenAll(dataTasks); + + // 总数统计优化 + var total = await Instance.ZCountAsync( + $"{redisCacheKey}_scores", + minScore, + maxScore); + + return new BusPagedResult + { + Items = dataTasks.Select(t => t.Result).ToList(), + TotalCount = total, + PageIndex = pageIndex, + PageSize = pageSize + }; + } + + + public async Task RemoveMeterZSetData( + string redisCacheKey, + string redisCacheIndexKey, + string uniqueId) // 改为基于唯一标识删除 + { + // 原子操作 + var luaScript = @" + local mainKey = KEYS[1] + local scoreKey = KEYS[2] + local indexKey = KEYS[3] + local member = ARGV[1] + + redis.call('HDEL', mainKey, member) + redis.call('ZREM', scoreKey, member) + redis.call('SREM', indexKey, member) + return 1 + "; + + var keys = new[] + { + redisCacheKey, + $"{redisCacheKey}_scores", + redisCacheIndexKey + }; + + var result = await Instance.EvalAsync(luaScript, + keys, + new[] { uniqueId }); + + if ((int)result != 1) + throw new Exception("删除操作失败"); + } + + public async Task> GetGlobalPagedData( + string redisCacheKey, + int pageSize = 10, + long? lastScore = null, + string lastMember = null, + bool descending = true) + { + const string zsetKey = "global_data_all"; + + // 分页参数处理 + var (startScore, excludeMember) = descending + ? (lastScore ?? long.MaxValue, lastMember) + : (lastScore ?? 0, lastMember); + + // 获取成员列表 + string[] members; + if (descending) + { + members = await Instance.ZRevRangeByScoreAsync( + zsetKey, + max: startScore, + min: 0, + offset: 0, + count: pageSize + 1); + } + else + { + members = await Instance.ZRangeByScoreAsync( + zsetKey, + min: startScore, + max: long.MaxValue, + offset: 0, + count: pageSize + 1); + } + + // 处理分页结果 + bool hasNext = members.Length > pageSize; + var actualMembers = members.Take(pageSize).ToArray(); + + // 批量获取数据(优化版本) + var dataTasks = actualMembers + .Select(m => Instance.HGetAsync(redisCacheKey, m)) + .ToArray(); + await Task.WhenAll(dataTasks); + + // 获取下一页游标 + (long? nextScore, string nextMember) = actualMembers.Any() + ? await GetNextCursor(zsetKey, actualMembers.Last(), descending) + : (null, null); + + return new GlobalPagedResult + { + Items = dataTasks.Select(t => t.Result).ToList(), + HasNext = hasNext, + NextScore = nextScore, + NextMember = nextMember + }; + } + + private async Task<(long? score, string member)> GetNextCursor( + string zsetKey, + string lastMember, + bool descending) + { + var score = await Instance.ZScoreAsync(zsetKey, lastMember); + return (score.HasValue ? (long)score.Value : null, lastMember); + } } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs index 0aad03e..292549a 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs @@ -1,4 +1,5 @@ -using System; +using JiShe.CollectBus.PagedResult; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -48,6 +49,6 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// /// - Task> QueryAsync(QueryOptions options) where T : IoTEntity, new(); + Task> QueryAsync(QueryOptions options) where T : IoTEntity, new(); } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs index 705b2c1..6ae0ef1 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs @@ -1,22 +1,14 @@ using Apache.IoTDB; using Apache.IoTDB.DataStructure; using JiShe.CollectBus.Common.Extensions; -using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IoTDBProvider.Context; using JiShe.CollectBus.IoTDBProvider.Interface; -using JiShe.CollectBus.IoTDBProvider.Provider; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; using System.Reflection; using System.Text; -using System.Threading.Tasks; -using static Thrift.Protocol.Utilities.TJSONProtocolConstants; +using JiShe.CollectBus.PagedResult; + namespace JiShe.CollectBus.IoTDBProvider { @@ -118,12 +110,12 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// /// - public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() + public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuerySQL(options); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - var result = new PagedResult + var result = new BusPagedResult { TotalCount = await GetTotalCount(options), Items = ParseResults(sessionDataSet, options.PageSize) diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 61cc788..a73eb79 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -1,5 +1,6 @@ using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Producer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Volo.Abp.Modularity; @@ -10,6 +11,10 @@ namespace JiShe.CollectBus.Kafka { public override void ConfigureServices(ServiceConfigurationContext context) { + // 注册Producer + context.Services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>)); + // 注册Consumer + context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>)); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 37efe3a..700d52f 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Kafka.Consumer private readonly ILogger> _logger; private CancellationTokenSource _cancellationTokenSource; - protected ConsumerService(IConfiguration configuration, ILogger> logger) + public ConsumerService(IConfiguration configuration, ILogger> logger) { _logger = logger; GetInstance(configuration); diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs index 2ceaed5..587013d 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs @@ -10,6 +10,7 @@ namespace JiShe.CollectBus.Kafka.Producer public interface IProducerService { Task ProduceAsync(string topic, TKey key, TValue value); + Task ProduceAsync(TopicPartition topicPartition, TKey key, TValue value); Task ProduceAsync(string topic, TValue value); void Dispose(); } diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index 0cfed2e..af6bc54 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -16,7 +16,7 @@ namespace JiShe.CollectBus.Kafka.Producer private readonly ILogger> _logger; - protected ProducerService(IConfiguration configuration, ILogger> logger) + public ProducerService(IConfiguration configuration, ILogger> logger) { _logger = logger; GetInstance(configuration); @@ -55,6 +55,17 @@ namespace JiShe.CollectBus.Kafka.Producer await Instance.ProduceAsync(topic, message); } + public async Task ProduceAsync(TopicPartition topicPartition, TKey key, TValue value) + { + var message = new Message + { + Key = key, + Value = value + }; + + await Instance.ProduceAsync(topicPartition, message); + } + public async Task ProduceAsync(string topic, TValue value) { var message = new Message