From e7f94ceae4e9ffca4e081f0b22deb96da9278f89 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 20 Mar 2025 16:40:27 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=AE=9A=E5=88=86=E5=BA=93=E5=88=86?= =?UTF-8?q?=E8=A1=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...he.CollectBus.Application.Contracts.csproj | 1 + .../IWorkerSubscriberAppService.cs | 5 + .../CollectBusApplicationModule.cs | 2 + .../JiShe.CollectBus.Application.csproj | 3 - .../BasicScheduledMeterReadingService.cs | 43 +++++--- ...nergySystemScheduledMeterReadingService.cs | 4 +- .../Subscribers/WorkerSubscriberAppService.cs | 22 ++++ .../BuildSendDatas/Build3761SendData.cs | 1 + .../MeterReadingRecords.cs | 11 ++ .../CollectBusHostModule.cs | 1 - .../JiShe.CollectBus.Host.csproj | 1 - src/JiShe.CollectBus.Host/appsettings.json | 4 +- .../MongoDB/CollectBusMongoDbContext.cs | 45 +++++--- .../MongoDB/CollectBusMongoDbModule.cs | 19 +++- .../MongoDB/CollectionHelper.cs | 25 ----- .../IMeterReadingRecordRepository.cs | 21 ++-- .../MeterReadingRecordRepository.cs | 102 ++++++++++++++++-- .../ShardingStrategy/DayShardingStrategy.cs | 59 ++++++++++ .../ShardingStrategy/IShardingStrategy.cs | 36 +++++++ 19 files changed, 328 insertions(+), 77 deletions(-) delete mode 100644 src/JiShe.CollectBus.MongoDB/MongoDB/CollectionHelper.cs create mode 100644 src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs create mode 100644 src/JiShe.CollectBus.MongoDB/ShardingStrategy/IShardingStrategy.cs diff --git a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj index de972d1..b937879 100644 --- a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj +++ b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj @@ -26,6 +26,7 @@ + diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index a7c4032..6d13b13 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -1,5 +1,7 @@ using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using System.Collections.Generic; using System.Threading.Tasks; using Volo.Abp.Application.Services; @@ -12,6 +14,9 @@ namespace JiShe.CollectBus.Subscribers { #region 电表消息采集 + + Task> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery(); + /// /// 1分钟采集电表数据下行消息消费订阅 /// diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 4efe690..0f4a3d0 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -13,6 +13,7 @@ using Volo.Abp.AspNetCore.Mvc.AntiForgery; using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.Workers; using Volo.Abp.BackgroundWorkers.Hangfire; +using JiShe.CollectBus.MongoDB; namespace JiShe.CollectBus; @@ -22,6 +23,7 @@ namespace JiShe.CollectBus; typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), typeof(AbpBackgroundWorkersHangfireModule), + typeof(CollectBusMongoDbModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule) )] diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index 0102b4f..ad1ca1d 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -25,10 +25,7 @@ - - - diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 9c569d1..9358b3e 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -21,6 +21,8 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Protocol.Contracts; +using JiShe.CollectBus.Repository; +using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Workers; using MassTransit; using MassTransit.Internals.GraphValidation; @@ -37,13 +39,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading { private readonly ILogger _logger; private readonly ICapPublisher _capBus; - private readonly IRepository _meterReadingRecordsRepository; + private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; public BasicScheduledMeterReadingService( ILogger logger, ICapPublisher capBus, - IRepository meterReadingRecordsRepository) + IMeterReadingRecordRepository meterReadingRecordsRepository) { _capBus = capBus; _logger = logger; @@ -100,7 +102,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 var tempArryay = item.Split(":"); string meteryType = tempArryay[3];//表计类别 - string timeDensity = tempArryay[4];//采集频率 + int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率 //获取缓存中的电表信息 var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*"; @@ -114,7 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { // 解析结果(结果为嵌套数组) - var meterInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity, meteryType); + var meterInfos = await GetMeterRedisCacheData(oneMinutekeyList, $"{timeDensity}", meteryType); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104"); @@ -296,7 +298,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); meterTaskInfosList.Add(ammerterItem.Value); } @@ -361,7 +363,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + _= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); meterTaskInfosList.Add(ammerterItem.Value); } @@ -398,6 +400,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 15; + var currentDateTime = DateTime.Now; + var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) @@ -428,7 +432,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); meterTaskInfosList.Add(ammerterItem.Value); } @@ -439,7 +443,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //删除任务数据 - await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); + //await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); //缓存下一个时间的任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() @@ -462,9 +466,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 采集频率1分钟、5分钟、15分钟 /// 集中器数据分组 /// - private async Task AmmerterScheduledMeterReadingIssued(string timeDensity, Dictionary> focusGroup) + private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary> focusGroup) { - if (string.IsNullOrWhiteSpace(timeDensity) || focusGroup == null || focusGroup.Count <= 0) + if (timeDensity <= 0) + { + timeDensity = 1; + } + + if (timeDensity > 15) + { + timeDensity = 15; + } + + if ( focusGroup == null || focusGroup.Count <= 0) { _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101"); return; @@ -522,12 +536,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 采集频率 /// 集中器号hash分组的集中器集合数据 /// - private async Task AmmerterCreatePublishTask(string timeDensity + private async Task AmmerterCreatePublishTask(int timeDensity , Dictionary> focusGroup) { - var HandlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var currentTime = DateTime.Now; + var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); foreach (var focusInfo in focusGroup) { //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 @@ -645,7 +660,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { string methonCode = $"AFN{aFNStr}_Fn_Send"; //特殊表暂不处理 - if (HandlerPacketBuilder != null && HandlerPacketBuilder.TryGetValue(methonCode + if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode , out var handler)) { dataInfos = handler(new TelemetryPacketRequest() @@ -673,6 +688,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading var meterReadingRecords = new MeterReadingRecords() { + PendingCopyReadTime = pendingCopyReadTime, + CreationTime = currentTime, MeterAddress = ammeter.AmmerterAddress, MeterId = ammeter.ID, MeterType = MeterTypeEnum.Ammeter, diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 045c9c2..d15e796 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -9,6 +9,8 @@ using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Repository; +using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.Logging; using Volo.Abp.Domain.Repositories; @@ -25,7 +27,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher capBus, IRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository) + ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository) { } diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 62748be..a0ea73a 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; using DotNetCore.CAP; @@ -6,8 +7,10 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -27,6 +30,7 @@ namespace JiShe.CollectBus.Subscribers private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IRepository _deviceRepository; + private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; /// @@ -39,16 +43,34 @@ namespace JiShe.CollectBus.Subscribers public WorkerSubscriberAppService(ILogger logger, ITcpService tcpService, IRepository deviceRepository, + IMeterReadingRecordRepository meterReadingRecordsRepository, IServiceProvider serviceProvider) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _deviceRepository = deviceRepository; + _meterReadingRecordsRepository = meterReadingRecordsRepository; } #region 电表消息采集 + + /// + /// 一分钟定时抄读任务消息消费订阅 + /// + /// + [HttpGet] + [Route("ammeter/oneminute/issued-eventQuery")] + public async Task> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery() + { + var currentDateTime = DateTime.Now; + + var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10)); + + return list; + } + /// /// 一分钟定时抄读任务消息消费订阅 /// diff --git a/src/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs b/src/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs index d7a9ece..f4dd11d 100644 --- a/src/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs +++ b/src/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs @@ -36,6 +36,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas MSA.Add(i); } } + /// /// Gets the msa. /// diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs index 48e4cd8..9f854df 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Volo.Abp.Domain.Entities; +using Volo.Abp.Domain.Entities.Auditing; namespace JiShe.CollectBus.IotSystems.MeterReadingRecords { @@ -19,6 +20,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// public bool ManualOrNot { get; set; } + /// + /// 待抄读时间 + /// + public DateTime PendingCopyReadTime { get; set; } + /// /// 下发消息内容 /// @@ -111,6 +117,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// public RecordsDataMigrationStatusEnum MigrationStatus { get; set; } + /// + /// 数据结果,最终的解析报文结果值 + /// + public string DataResult { get; set; } + /// /// 数据迁移时间 /// diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 933641b..1283dfc 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -25,7 +25,6 @@ namespace JiShe.CollectBus.Host typeof(AbpAspNetCoreSerilogModule), typeof(AbpSwashbuckleModule), typeof(CollectBusApplicationModule), - typeof(CollectBusMongoDbModule), typeof(AbpCachingStackExchangeRedisModule), typeof(AbpBackgroundWorkersHangfireModule) )] diff --git a/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj index 5a549e5..01507cc 100644 --- a/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj +++ b/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj @@ -55,7 +55,6 @@ - diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 1c50ce4..50c459b 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -34,13 +34,13 @@ "CorsOrigins": "http://localhost:4200,http://localhost:3100" }, "ConnectionStrings": { - "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=200&minPoolSize=10&waitQueueTimeoutMS=5000", + "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" }, "Redis": { - "Configuration": "192.168.111.248:6379,password=123456abcD,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "DefaultDB": "14", "HangfireDB": "15" }, diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs index c334325..d17a0d2 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs @@ -1,11 +1,13 @@ using JiShe.CollectBus.IotSystems.Devices; -using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Protocols; +using JiShe.CollectBus.ShardingStrategy; using MongoDB.Bson; +using MongoDB.Bson.Serialization; using MongoDB.Driver; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using Volo.Abp.Data; using Volo.Abp.MongoDB; @@ -26,35 +28,48 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon public IMongoCollection MessageReceivedHeartbeats => Collection(); public IMongoCollection Devices => Collection(); public IMongoCollection ProtocolInfos => Collection(); - - /// - /// 抄表记录,默认按天分表 - /// - public IMongoCollection MeterReadingRecordInfo => Database.GetCollection(DateTime.Now.GetCollectionName()); - - + protected override void CreateModel(IMongoModelBuilder modelBuilder) { - base.CreateModel(modelBuilder); + base.CreateModel(modelBuilder); + modelBuilder.ConfigureCollectBus(); modelBuilder.Entity(builder => { // 创建索引 builder.ConfigureIndexes(index => { - List> createIndexModels = new List>(); - createIndexModels.Add(new CreateIndexModel( + //List> createIndexModels = new List>(); + //createIndexModels.Add(new CreateIndexModel( + // Builders.IndexKeys.Ascending(nameof(MeterReadingRecords)), + // new CreateIndexOptions + // { + // Unique = true + // } + // )); + + + //var indexKeys = Builders.IndexKeys + //.Ascending("CreationTime") + //.Ascending("OrderNumber"); + + //var indexOptions = new CreateIndexOptions + //{ + // Background = true, + // Name = "IX_CreationTime_OrderNumber" + //}; + //index.CreateOne( + //new CreateIndexModel(indexKeys, indexOptions)); + + index.CreateOne(new CreateIndexModel( Builders.IndexKeys.Ascending(nameof(MeterReadingRecords)), new CreateIndexOptions { Unique = true } )); - index.CreateMany(createIndexModels); }); }); - - modelBuilder.ConfigureCollectBus(); - } + } } diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs index 37f9377..d3d9559 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs @@ -1,6 +1,15 @@ -using Microsoft.Extensions.DependencyInjection; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using JiShe.CollectBus.Repository; +using JiShe.CollectBus.Repository.MeterReadingRecord; +using JiShe.CollectBus.ShardingStrategy; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using Volo.Abp; using Volo.Abp.AuditLogging.MongoDB; using Volo.Abp.BackgroundJobs.MongoDB; +using Volo.Abp.Domain.Repositories; +using Volo.Abp.Domain.Repositories.MongoDB; using Volo.Abp.Modularity; using Volo.Abp.MongoDB; using Volo.Abp.Uow; @@ -20,6 +29,14 @@ public class CollectBusMongoDbModule : AbpModule context.Services.AddMongoDbContext(options => { options.AddDefaultRepositories(); + + // 注册分表策略 + context.Services.AddTransient( + typeof(IShardingStrategy<>), + typeof(DayShardingStrategy<>)); + + // 分表策略仓储 替换默认仓储 + options.AddRepository(); }); context.Services.AddAlwaysDisableUnitOfWorkTransaction(); diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectionHelper.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectionHelper.cs deleted file mode 100644 index 8bc80ce..0000000 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectionHelper.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.MongoDB -{ - /// - /// MongoDB集合帮助类 - /// - public static class CollectionHelper - { - /// - /// 获取集合名称 - /// - /// - /// - /// - public static string GetCollectionName(this DateTime time) - { - return $"{typeof(T).Name}{time:yyyyMMddHHmm}"; - } - } -} diff --git a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs index 7ab95e9..788dbb3 100644 --- a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs +++ b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs @@ -12,24 +12,31 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord /// /// 抄读仓储接口 /// - public interface IMeterReadingRecordRepository: IRepository + public interface IMeterReadingRecordRepository : IRepository { /// /// 批量插入 /// /// - /// - /// + /// /// - Task InsertManyAsync(List entities, DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken)); + Task InsertManyAsync(List entities, + DateTime? dateTime); /// /// 单个插入 /// /// - /// - /// + /// /// - Task InsertOneAsync(MeterReadingRecords entity, DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken)); + Task InsertAsync(MeterReadingRecords entity, DateTime? dateTime); + + /// + /// 多集合数据查询 + /// + /// + /// + /// + Task> ParallelQueryAsync(DateTime startTime, DateTime endTime); } } diff --git a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs index 87337ea..14e67d7 100644 --- a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs +++ b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs @@ -1,8 +1,10 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.MongoDB; +using JiShe.CollectBus.ShardingStrategy; using MongoDB.Bson; using MongoDB.Driver; using System; +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; @@ -19,35 +21,119 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord /// public class MeterReadingRecordRepository : MongoDbRepository, IMeterReadingRecordRepository { - public MeterReadingRecordRepository(IMongoDbContextProvider dbContextProvider) - : base(dbContextProvider) + + private readonly IShardingStrategy _shardingStrategy; + private readonly IMongoDbContextProvider _dbContextProvider; + + public MeterReadingRecordRepository( + IMongoDbContextProvider dbContextProvider, + IShardingStrategy shardingStrategy + ) + : base(dbContextProvider) { + _dbContextProvider = dbContextProvider; + _shardingStrategy = shardingStrategy; } /// /// 批量插入 /// /// - /// /// /// - public async Task InsertManyAsync(List entities, DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken)) + public override async Task> InsertManyAsync(IEnumerable entities, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken)) { - var collection = await GetCollectionAsync(cancellationToken); + var collection = await GetShardedCollection(DateTime.Now); + await collection.InsertManyAsync(entities); + + return entities; + } + + /// + /// 批量插入 + /// + /// + /// + /// + public async Task InsertManyAsync(List entities, DateTime? dateTime) + { + var collection = await GetShardedCollection(dateTime); await collection.InsertManyAsync(entities); } + /// /// 单条插入 /// /// - /// /// /// - public async Task InsertOneAsync(MeterReadingRecords entity,DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken)) + public override async Task InsertAsync(MeterReadingRecords entity, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken)) { - var collection = await GetCollectionAsync(cancellationToken); + var collection = await GetShardedCollection(DateTime.Now); await collection.InsertOneAsync(entity); + return entity; + } + + + /// + /// 单条插入 + /// + /// + /// + /// + public async Task InsertAsync(MeterReadingRecords entity, DateTime? dateTime) + { + var collection = await GetShardedCollection(dateTime); + await collection.InsertOneAsync(entity); + return entity; + } + + /// + /// 多集合数据查询 + /// + /// + /// + /// + public async Task> ParallelQueryAsync(DateTime startTime, DateTime endTime) + { + var collectionNames = _shardingStrategy.GetQueryCollectionNames(startTime, endTime); + var database = await GetDatabaseAsync(); + + + var tasks = collectionNames.Select(async name => + { + var collection = database.GetCollection(name); + var filter = Builders.Filter.And( + Builders.Filter.Gte(x => x.CreationTime, startTime), + Builders.Filter.Lte(x => x.CreationTime, endTime) + ); + return await collection.Find(filter).ToListAsync(); + }); + + var results = await Task.WhenAll(tasks); + return results.SelectMany(r => r).ToList(); + } + + /// + /// 获得分片集合 + /// + /// + private async Task> GetShardedCollection(DateTime? dateTime) + { + var database = await GetDatabaseAsync(); + string collectionName = string.Empty; + + if (dateTime != null) + { + collectionName = _shardingStrategy.GetCollectionName(dateTime.Value); + } + else + { + collectionName = _shardingStrategy.GetCurrentCollectionName(); + } + + return database.GetCollection(collectionName); } } } diff --git a/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs new file mode 100644 index 0000000..75157e5 --- /dev/null +++ b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; + +namespace JiShe.CollectBus.ShardingStrategy +{ + /// + /// 按天分表策略 + /// + /// + public class DayShardingStrategy : IShardingStrategy + { + /// + /// 获取指定时间对应的集合名 + /// + /// + /// + public string GetCollectionName(DateTime dateTime) + { + var baseName = typeof(TEntity).Name; + return $"{baseName}_{dateTime:yyyyMMddHHmm}"; + } + + /// + /// 获取当前时间对应的集合名 + /// + /// + public string GetCurrentCollectionName() + { + var baseName = typeof(TEntity).Name; + return $"{baseName}_{DateTime.Now:yyyyMMddHHmm}"; + } + + /// + /// 用于查询时确定目标集合 + /// + /// + /// + /// + public IEnumerable GetQueryCollectionNames(DateTime? startTime, DateTime? endTime) + { + var months = new List(); + var current = startTime ?? DateTime.MinValue; + var end = endTime ?? DateTime.MaxValue; + var baseName = typeof(TEntity).Name; + + while (current <= end) + { + months.Add($"{baseName}_{current:yyyyMMddHHmm}"); + current = current.AddMonths(1); + } + + return months.Distinct(); + } + } +} diff --git a/src/JiShe.CollectBus.MongoDB/ShardingStrategy/IShardingStrategy.cs b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/IShardingStrategy.cs new file mode 100644 index 0000000..151d5df --- /dev/null +++ b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/IShardingStrategy.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.ShardingStrategy +{ + /// + /// 数据存储分片策略 + /// + /// + public interface IShardingStrategy + { + /// + /// 获取指定时间对应的集合名 + /// + /// + string GetCollectionName(DateTime dateTime); + + /// + /// 获取当前时间对应的集合名 + /// + /// + string GetCurrentCollectionName(); + + /// + /// 用于查询时确定目标集合 + /// + /// + /// + /// + IEnumerable GetQueryCollectionNames(DateTime? startTime = null, + DateTime? endTime = null); + } +}