自定分库分表实现
This commit is contained in:
parent
e836522e3a
commit
e7f94ceae4
@ -26,6 +26,7 @@
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Volo.Abp.Application.Services;
|
||||
|
||||
@ -12,6 +14,9 @@ namespace JiShe.CollectBus.Subscribers
|
||||
{
|
||||
|
||||
#region 电表消息采集
|
||||
|
||||
Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery();
|
||||
|
||||
/// <summary>
|
||||
/// 1分钟采集电表数据下行消息消费订阅
|
||||
/// </summary>
|
||||
|
||||
@ -13,6 +13,7 @@ using Volo.Abp.AspNetCore.Mvc.AntiForgery;
|
||||
using JiShe.CollectBus.FreeRedisProvider;
|
||||
using JiShe.CollectBus.Workers;
|
||||
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||
using JiShe.CollectBus.MongoDB;
|
||||
|
||||
namespace JiShe.CollectBus;
|
||||
|
||||
@ -22,6 +23,7 @@ namespace JiShe.CollectBus;
|
||||
typeof(AbpDddApplicationModule),
|
||||
typeof(AbpAutoMapperModule),
|
||||
typeof(AbpBackgroundWorkersHangfireModule),
|
||||
typeof(CollectBusMongoDbModule),
|
||||
typeof(CollectBusFreeRedisModule),
|
||||
typeof(CollectBusFreeSqlModule)
|
||||
)]
|
||||
|
||||
@ -25,10 +25,7 @@
|
||||
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
|
||||
@ -21,6 +21,8 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||
using JiShe.CollectBus.Protocol.Contracts;
|
||||
using JiShe.CollectBus.Repository;
|
||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||
using JiShe.CollectBus.Workers;
|
||||
using MassTransit;
|
||||
using MassTransit.Internals.GraphValidation;
|
||||
@ -37,13 +39,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
{
|
||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
||||
private readonly ICapPublisher _capBus;
|
||||
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
|
||||
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
||||
|
||||
|
||||
public BasicScheduledMeterReadingService(
|
||||
ILogger<BasicScheduledMeterReadingService> logger,
|
||||
ICapPublisher capBus,
|
||||
IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
|
||||
IMeterReadingRecordRepository meterReadingRecordsRepository)
|
||||
{
|
||||
_capBus = capBus;
|
||||
_logger = logger;
|
||||
@ -100,7 +102,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
|
||||
var tempArryay = item.Split(":");
|
||||
string meteryType = tempArryay[3];//表计类别
|
||||
string timeDensity = tempArryay[4];//采集频率
|
||||
int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率
|
||||
|
||||
//获取缓存中的电表信息
|
||||
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*";
|
||||
@ -114,7 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
||||
{
|
||||
// 解析结果(结果为嵌套数组)
|
||||
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, timeDensity, meteryType);
|
||||
var meterInfos = await GetMeterRedisCacheData<AmmeterInfo>(oneMinutekeyList, $"{timeDensity}", meteryType);
|
||||
if (meterInfos == null || meterInfos.Count <= 0)
|
||||
{
|
||||
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
|
||||
@ -296,7 +298,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||
TimeDensity = timeDensity.ToString(),
|
||||
};
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
||||
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
||||
|
||||
meterTaskInfosList.Add(ammerterItem.Value);
|
||||
}
|
||||
@ -361,7 +363,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||
TimeDensity = timeDensity.ToString(),
|
||||
};
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
||||
_= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
||||
|
||||
meterTaskInfosList.Add(ammerterItem.Value);
|
||||
}
|
||||
@ -398,6 +400,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
|
||||
//获取缓存中的电表信息
|
||||
int timeDensity = 15;
|
||||
var currentDateTime = DateTime.Now;
|
||||
|
||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
||||
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
||||
@ -428,7 +432,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||
TimeDensity = timeDensity.ToString(),
|
||||
};
|
||||
await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
||||
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||
|
||||
meterTaskInfosList.Add(ammerterItem.Value);
|
||||
}
|
||||
@ -439,7 +443,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
}
|
||||
|
||||
//删除任务数据
|
||||
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
||||
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
||||
|
||||
//缓存下一个时间的任务
|
||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||
@ -462,9 +466,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
|
||||
/// <param name="focusGroup">集中器数据分组</param>
|
||||
/// <returns></returns>
|
||||
private async Task AmmerterScheduledMeterReadingIssued(string timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||
private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(timeDensity) || focusGroup == null || focusGroup.Count <= 0)
|
||||
if (timeDensity <= 0)
|
||||
{
|
||||
timeDensity = 1;
|
||||
}
|
||||
|
||||
if (timeDensity > 15)
|
||||
{
|
||||
timeDensity = 15;
|
||||
}
|
||||
|
||||
if ( focusGroup == null || focusGroup.Count <= 0)
|
||||
{
|
||||
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
|
||||
return;
|
||||
@ -522,12 +536,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
/// <param name="timeDensity">采集频率</param>
|
||||
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
|
||||
/// <returns></returns>
|
||||
private async Task AmmerterCreatePublishTask(string timeDensity
|
||||
private async Task AmmerterCreatePublishTask(int timeDensity
|
||||
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
|
||||
{
|
||||
var HandlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||
|
||||
var currentTime = DateTime.Now;
|
||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
||||
foreach (var focusInfo in focusGroup)
|
||||
{
|
||||
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
||||
@ -645,7 +660,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
{
|
||||
string methonCode = $"AFN{aFNStr}_Fn_Send";
|
||||
//特殊表暂不处理
|
||||
if (HandlerPacketBuilder != null && HandlerPacketBuilder.TryGetValue(methonCode
|
||||
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
|
||||
, out var handler))
|
||||
{
|
||||
dataInfos = handler(new TelemetryPacketRequest()
|
||||
@ -673,6 +688,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
|
||||
var meterReadingRecords = new MeterReadingRecords()
|
||||
{
|
||||
PendingCopyReadTime = pendingCopyReadTime,
|
||||
CreationTime = currentTime,
|
||||
MeterAddress = ammeter.AmmerterAddress,
|
||||
MeterId = ammeter.ID,
|
||||
MeterType = MeterTypeEnum.Ammeter,
|
||||
|
||||
@ -9,6 +9,8 @@ using JiShe.CollectBus.GatherItem;
|
||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||
using JiShe.CollectBus.Repository;
|
||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Volo.Abp.Domain.Repositories;
|
||||
@ -25,7 +27,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
{
|
||||
|
||||
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
|
||||
ICapPublisher capBus, IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository)
|
||||
ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using DeviceDetectorNET.Parser.Device;
|
||||
using DotNetCore.CAP;
|
||||
@ -6,8 +7,10 @@ using JiShe.CollectBus.Common.Enums;
|
||||
using JiShe.CollectBus.IotSystems.Devices;
|
||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.Protocol.Contracts;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
@ -27,6 +30,7 @@ namespace JiShe.CollectBus.Subscribers
|
||||
private readonly ITcpService _tcpService;
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
||||
|
||||
|
||||
/// <summary>
|
||||
@ -39,16 +43,34 @@ namespace JiShe.CollectBus.Subscribers
|
||||
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
||||
ITcpService tcpService,
|
||||
IRepository<Device, Guid> deviceRepository,
|
||||
IMeterReadingRecordRepository meterReadingRecordsRepository,
|
||||
IServiceProvider serviceProvider)
|
||||
{
|
||||
_logger = logger;
|
||||
_tcpService = tcpService;
|
||||
_serviceProvider = serviceProvider;
|
||||
_deviceRepository = deviceRepository;
|
||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
||||
}
|
||||
|
||||
|
||||
#region 电表消息采集
|
||||
|
||||
/// <summary>
|
||||
/// 一分钟定时抄读任务消息消费订阅
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
[HttpGet]
|
||||
[Route("ammeter/oneminute/issued-eventQuery")]
|
||||
public async Task<List<MeterReadingRecords>> AmmeterScheduledMeterOneMinuteReadingIssuedEventQuery()
|
||||
{
|
||||
var currentDateTime = DateTime.Now;
|
||||
|
||||
var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10));
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 一分钟定时抄读任务消息消费订阅
|
||||
/// </summary>
|
||||
|
||||
@ -36,6 +36,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
|
||||
MSA.Add(i);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the msa.
|
||||
/// </summary>
|
||||
|
||||
@ -5,6 +5,7 @@ using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Volo.Abp.Domain.Entities;
|
||||
using Volo.Abp.Domain.Entities.Auditing;
|
||||
|
||||
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
||||
{
|
||||
@ -19,6 +20,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
||||
/// </summary>
|
||||
public bool ManualOrNot { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 待抄读时间
|
||||
/// </summary>
|
||||
public DateTime PendingCopyReadTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 下发消息内容
|
||||
/// </summary>
|
||||
@ -111,6 +117,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
||||
/// </summary>
|
||||
public RecordsDataMigrationStatusEnum MigrationStatus { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 数据结果,最终的解析报文结果值
|
||||
/// </summary>
|
||||
public string DataResult { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 数据迁移时间
|
||||
/// </summary>
|
||||
|
||||
@ -25,7 +25,6 @@ namespace JiShe.CollectBus.Host
|
||||
typeof(AbpAspNetCoreSerilogModule),
|
||||
typeof(AbpSwashbuckleModule),
|
||||
typeof(CollectBusApplicationModule),
|
||||
typeof(CollectBusMongoDbModule),
|
||||
typeof(AbpCachingStackExchangeRedisModule),
|
||||
typeof(AbpBackgroundWorkersHangfireModule)
|
||||
)]
|
||||
|
||||
@ -55,7 +55,6 @@
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@ -34,13 +34,13 @@
|
||||
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
|
||||
},
|
||||
"ConnectionStrings": {
|
||||
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=200&minPoolSize=10&waitQueueTimeoutMS=5000",
|
||||
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
|
||||
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
|
||||
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
|
||||
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
|
||||
},
|
||||
"Redis": {
|
||||
"Configuration": "192.168.111.248:6379,password=123456abcD,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
||||
"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
||||
"DefaultDB": "14",
|
||||
"HangfireDB": "15"
|
||||
},
|
||||
|
||||
@ -1,11 +1,13 @@
|
||||
using JiShe.CollectBus.IotSystems.Devices;
|
||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.IotSystems.Protocols;
|
||||
using JiShe.CollectBus.ShardingStrategy;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.Serialization;
|
||||
using MongoDB.Driver;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using Volo.Abp.Data;
|
||||
using Volo.Abp.MongoDB;
|
||||
@ -27,34 +29,47 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
|
||||
public IMongoCollection<Device> Devices => Collection<Device>();
|
||||
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
|
||||
|
||||
/// <summary>
|
||||
/// 抄表记录,默认按天分表
|
||||
/// </summary>
|
||||
public IMongoCollection<MeterReadingRecords> MeterReadingRecordInfo => Database.GetCollection<MeterReadingRecords>(DateTime.Now.GetCollectionName<MeterReadingRecords>());
|
||||
|
||||
|
||||
protected override void CreateModel(IMongoModelBuilder modelBuilder)
|
||||
{
|
||||
base.CreateModel(modelBuilder);
|
||||
modelBuilder.ConfigureCollectBus();
|
||||
|
||||
modelBuilder.Entity<MeterReadingRecords>(builder =>
|
||||
{
|
||||
// 创建索引
|
||||
builder.ConfigureIndexes(index =>
|
||||
{
|
||||
List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>();
|
||||
createIndexModels.Add(new CreateIndexModel<BsonDocument>(
|
||||
//List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>();
|
||||
//createIndexModels.Add(new CreateIndexModel<BsonDocument>(
|
||||
// Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
||||
// new CreateIndexOptions
|
||||
// {
|
||||
// Unique = true
|
||||
// }
|
||||
// ));
|
||||
|
||||
|
||||
//var indexKeys = Builders<BsonDocument>.IndexKeys
|
||||
//.Ascending("CreationTime")
|
||||
//.Ascending("OrderNumber");
|
||||
|
||||
//var indexOptions = new CreateIndexOptions
|
||||
//{
|
||||
// Background = true,
|
||||
// Name = "IX_CreationTime_OrderNumber"
|
||||
//};
|
||||
//index.CreateOne(
|
||||
//new CreateIndexModel<BsonDocument>(indexKeys, indexOptions));
|
||||
|
||||
index.CreateOne(new CreateIndexModel<BsonDocument>(
|
||||
Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
||||
new CreateIndexOptions
|
||||
{
|
||||
Unique = true
|
||||
}
|
||||
));
|
||||
index.CreateMany(createIndexModels);
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
modelBuilder.ConfigureCollectBus();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,15 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.Repository;
|
||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||
using JiShe.CollectBus.ShardingStrategy;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using System;
|
||||
using Volo.Abp;
|
||||
using Volo.Abp.AuditLogging.MongoDB;
|
||||
using Volo.Abp.BackgroundJobs.MongoDB;
|
||||
using Volo.Abp.Domain.Repositories;
|
||||
using Volo.Abp.Domain.Repositories.MongoDB;
|
||||
using Volo.Abp.Modularity;
|
||||
using Volo.Abp.MongoDB;
|
||||
using Volo.Abp.Uow;
|
||||
@ -20,6 +29,14 @@ public class CollectBusMongoDbModule : AbpModule
|
||||
context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options =>
|
||||
{
|
||||
options.AddDefaultRepositories();
|
||||
|
||||
// 注册分表策略
|
||||
context.Services.AddTransient(
|
||||
typeof(IShardingStrategy<>),
|
||||
typeof(DayShardingStrategy<>));
|
||||
|
||||
// 分表策略仓储 替换默认仓储
|
||||
options.AddRepository<MeterReadingRecords, MeterReadingRecordRepository>();
|
||||
});
|
||||
|
||||
context.Services.AddAlwaysDisableUnitOfWorkTransaction();
|
||||
|
||||
@ -1,25 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
/// <summary>
|
||||
/// MongoDB集合帮助类
|
||||
/// </summary>
|
||||
public static class CollectionHelper
|
||||
{
|
||||
/// <summary>
|
||||
/// 获取集合名称
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="time"></param>
|
||||
/// <returns></returns>
|
||||
public static string GetCollectionName<T>(this DateTime time)
|
||||
{
|
||||
return $"{typeof(T).Name}{time:yyyyMMddHHmm}";
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -12,24 +12,31 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
||||
/// <summary>
|
||||
/// 抄读仓储接口
|
||||
/// </summary>
|
||||
public interface IMeterReadingRecordRepository: IRepository<MeterReadingRecords, Guid>
|
||||
public interface IMeterReadingRecordRepository : IRepository<MeterReadingRecords, Guid>
|
||||
{
|
||||
/// <summary>
|
||||
/// 批量插入
|
||||
/// </summary>
|
||||
/// <param name="entities"></param>
|
||||
/// <param name="dayTime"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <param name="dateTime"></param>
|
||||
/// <returns></returns>
|
||||
Task InsertManyAsync(List<MeterReadingRecords> entities, DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken));
|
||||
Task InsertManyAsync(List<MeterReadingRecords> entities,
|
||||
DateTime? dateTime);
|
||||
|
||||
/// <summary>
|
||||
/// 单个插入
|
||||
/// </summary>
|
||||
/// <param name="entity"></param>
|
||||
/// <param name="dayTime"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <param name="dateTime"></param>
|
||||
/// <returns></returns>
|
||||
Task InsertOneAsync(MeterReadingRecords entity, DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken));
|
||||
Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime);
|
||||
|
||||
/// <summary>
|
||||
/// 多集合数据查询
|
||||
/// </summary>
|
||||
/// <param name="startTime"></param>
|
||||
/// <param name="endTime"></param>
|
||||
/// <returns></returns>
|
||||
Task<List<MeterReadingRecords>> ParallelQueryAsync(DateTime startTime, DateTime endTime);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.MongoDB;
|
||||
using JiShe.CollectBus.ShardingStrategy;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Driver;
|
||||
using System;
|
||||
using System.Collections;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
@ -19,35 +21,119 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
||||
/// </summary>
|
||||
public class MeterReadingRecordRepository : MongoDbRepository<CollectBusMongoDbContext, MeterReadingRecords, Guid>, IMeterReadingRecordRepository
|
||||
{
|
||||
public MeterReadingRecordRepository(IMongoDbContextProvider<CollectBusMongoDbContext> dbContextProvider)
|
||||
: base(dbContextProvider)
|
||||
|
||||
private readonly IShardingStrategy<MeterReadingRecords> _shardingStrategy;
|
||||
private readonly IMongoDbContextProvider<CollectBusMongoDbContext> _dbContextProvider;
|
||||
|
||||
public MeterReadingRecordRepository(
|
||||
IMongoDbContextProvider<CollectBusMongoDbContext> dbContextProvider,
|
||||
IShardingStrategy<MeterReadingRecords> shardingStrategy
|
||||
)
|
||||
: base(dbContextProvider)
|
||||
{
|
||||
_dbContextProvider = dbContextProvider;
|
||||
_shardingStrategy = shardingStrategy;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 批量插入
|
||||
/// </summary>
|
||||
/// <param name="entities"></param>
|
||||
/// <param name="dayTime"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public async Task InsertManyAsync(List<MeterReadingRecords> entities, DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken))
|
||||
public override async Task<IEnumerable<MeterReadingRecords>> InsertManyAsync(IEnumerable<MeterReadingRecords> entities, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
var collection = await GetCollectionAsync(cancellationToken);
|
||||
var collection = await GetShardedCollection(DateTime.Now);
|
||||
await collection.InsertManyAsync(entities);
|
||||
|
||||
return entities;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 批量插入
|
||||
/// </summary>
|
||||
/// <param name="entities"></param>
|
||||
/// <param name="dateTime"></param>
|
||||
/// <returns></returns>
|
||||
public async Task InsertManyAsync(List<MeterReadingRecords> entities, DateTime? dateTime)
|
||||
{
|
||||
var collection = await GetShardedCollection(dateTime);
|
||||
await collection.InsertManyAsync(entities);
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 单条插入
|
||||
/// </summary>
|
||||
/// <param name="entity"></param>
|
||||
/// <param name="dayTime"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public async Task InsertOneAsync(MeterReadingRecords entity,DateTime dayTime, CancellationToken cancellationToken = default(CancellationToken))
|
||||
public override async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, bool autoSave = false, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
var collection = await GetCollectionAsync(cancellationToken);
|
||||
var collection = await GetShardedCollection(DateTime.Now);
|
||||
await collection.InsertOneAsync(entity);
|
||||
return entity;
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 单条插入
|
||||
/// </summary>
|
||||
/// <param name="entity"></param>
|
||||
/// <param name="dateTime"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime)
|
||||
{
|
||||
var collection = await GetShardedCollection(dateTime);
|
||||
await collection.InsertOneAsync(entity);
|
||||
return entity;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 多集合数据查询
|
||||
/// </summary>
|
||||
/// <param name="startTime"></param>
|
||||
/// <param name="endTime"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<List<MeterReadingRecords>> ParallelQueryAsync(DateTime startTime, DateTime endTime)
|
||||
{
|
||||
var collectionNames = _shardingStrategy.GetQueryCollectionNames(startTime, endTime);
|
||||
var database = await GetDatabaseAsync();
|
||||
|
||||
|
||||
var tasks = collectionNames.Select(async name =>
|
||||
{
|
||||
var collection = database.GetCollection<MeterReadingRecords>(name);
|
||||
var filter = Builders<MeterReadingRecords>.Filter.And(
|
||||
Builders<MeterReadingRecords>.Filter.Gte(x => x.CreationTime, startTime),
|
||||
Builders<MeterReadingRecords>.Filter.Lte(x => x.CreationTime, endTime)
|
||||
);
|
||||
return await collection.Find(filter).ToListAsync();
|
||||
});
|
||||
|
||||
var results = await Task.WhenAll(tasks);
|
||||
return results.SelectMany(r => r).ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获得分片集合
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
private async Task<IMongoCollection<MeterReadingRecords>> GetShardedCollection(DateTime? dateTime)
|
||||
{
|
||||
var database = await GetDatabaseAsync();
|
||||
string collectionName = string.Empty;
|
||||
|
||||
if (dateTime != null)
|
||||
{
|
||||
collectionName = _shardingStrategy.GetCollectionName(dateTime.Value);
|
||||
}
|
||||
else
|
||||
{
|
||||
collectionName = _shardingStrategy.GetCurrentCollectionName();
|
||||
}
|
||||
|
||||
return database.GetCollection<MeterReadingRecords>(collectionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,59 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
|
||||
namespace JiShe.CollectBus.ShardingStrategy
|
||||
{
|
||||
/// <summary>
|
||||
/// 按天分表策略
|
||||
/// </summary>
|
||||
/// <typeparam name="TEntity"></typeparam>
|
||||
public class DayShardingStrategy<TEntity> : IShardingStrategy<TEntity>
|
||||
{
|
||||
/// <summary>
|
||||
/// 获取指定时间对应的集合名
|
||||
/// </summary>
|
||||
/// <param name="dateTime"></param>
|
||||
/// <returns></returns>
|
||||
public string GetCollectionName(DateTime dateTime)
|
||||
{
|
||||
var baseName = typeof(TEntity).Name;
|
||||
return $"{baseName}_{dateTime:yyyyMMddHHmm}";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取当前时间对应的集合名
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public string GetCurrentCollectionName()
|
||||
{
|
||||
var baseName = typeof(TEntity).Name;
|
||||
return $"{baseName}_{DateTime.Now:yyyyMMddHHmm}";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 用于查询时确定目标集合
|
||||
/// </summary>
|
||||
/// <param name="startTime"></param>
|
||||
/// <param name="endTime"></param>
|
||||
/// <returns></returns>
|
||||
public IEnumerable<string> GetQueryCollectionNames(DateTime? startTime, DateTime? endTime)
|
||||
{
|
||||
var months = new List<string>();
|
||||
var current = startTime ?? DateTime.MinValue;
|
||||
var end = endTime ?? DateTime.MaxValue;
|
||||
var baseName = typeof(TEntity).Name;
|
||||
|
||||
while (current <= end)
|
||||
{
|
||||
months.Add($"{baseName}_{current:yyyyMMddHHmm}");
|
||||
current = current.AddMonths(1);
|
||||
}
|
||||
|
||||
return months.Distinct();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.ShardingStrategy
|
||||
{
|
||||
/// <summary>
|
||||
/// 数据存储分片策略
|
||||
/// </summary>
|
||||
/// <typeparam name="TEntity"></typeparam>
|
||||
public interface IShardingStrategy<TEntity>
|
||||
{
|
||||
/// <summary>
|
||||
/// 获取指定时间对应的集合名
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
string GetCollectionName(DateTime dateTime);
|
||||
|
||||
/// <summary>
|
||||
/// 获取当前时间对应的集合名
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
string GetCurrentCollectionName();
|
||||
|
||||
/// <summary>
|
||||
/// 用于查询时确定目标集合
|
||||
/// </summary>
|
||||
/// <param name="startTime"></param>
|
||||
/// <param name="endTime"></param>
|
||||
/// <returns></returns>
|
||||
IEnumerable<string> GetQueryCollectionNames(DateTime? startTime = null,
|
||||
DateTime? endTime = null);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user