diff --git a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj index b937879..de972d1 100644 --- a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj +++ b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj @@ -26,7 +26,6 @@ - diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 394bd87..b13548f 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -15,6 +15,7 @@ using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.Workers; using Volo.Abp.BackgroundWorkers.Hangfire; using JiShe.CollectBus.MongoDB; +using JiShe.CollectBus.ScheduledMeterReading; using AutoMapper.Configuration.Annotations; using JiShe.CollectBus.Common.Attributes; @@ -26,7 +27,6 @@ namespace JiShe.CollectBus; typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), typeof(AbpBackgroundWorkersHangfireModule), - typeof(CollectBusMongoDbModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule) )] @@ -54,12 +54,13 @@ public class CollectBusApplicationModule : AbpModule var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList(); foreach (var type in types) { - var ignoreJob = type.GetCustomAttribute(); - if (ignoreJob == null) - { - context.AddBackgroundWorkerAsync(type); - } + context.AddBackgroundWorkerAsync(type); } + + var dbContext = context.ServiceProvider.GetRequiredService(); + + //默认初始化表计信息 + dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); } } diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index ad1ca1d..94c4faf 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -26,6 +26,7 @@ + diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 9358b3e..bb3c743 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -189,10 +189,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; #if DEBUG - //每次缓存时,删除缓存,避免缓存数据错误 + //每次缓存时,删除缓存,避免缓存数据有不准确的问题 await FreeRedisProvider.Instance.DelAsync(redisCacheKey); #else - //每次缓存时,删除缓存,避免缓存数据错误 + //每次缓存时,删除缓存,避免缓存数据有不准确的问题 await FreeRedisProvider.Instance.DelAsync(redisCacheKey); #endif @@ -688,6 +688,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading var meterReadingRecords = new MeterReadingRecords() { + ProjectID = ammeter.ProjectID, + DatabaseBusiID = ammeter.DatabaseBusiID, PendingCopyReadTime = pendingCopyReadTime, CreationTime = currentTime, MeterAddress = ammeter.AmmerterAddress, diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index d15e796..24507fe 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -6,6 +6,7 @@ using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; @@ -62,7 +63,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //[Route($"ammeter/list")] public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") { - string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress + string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID FROM TB_GatherInfo(NOLOCK) AS A INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 @@ -114,7 +115,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading C.GatherCode, A.[ProjectID], B.AbnormalState, - B.LastTime + B.LastTime, + CONCAT(B.AreaCode, B.[Address]) AS FocusAddress, + (select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A INNER JOIN [dbo].[TB_FocusInfo](NOLOCK) AS B ON A.FocusID=B.ID AND B.RemoveState >= 0 AND B.State>=0 INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index a0ea73a..cde387c 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -66,9 +66,11 @@ namespace JiShe.CollectBus.Subscribers { var currentDateTime = DateTime.Now; - var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10)); + var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10)); return list; + + //return null; } /// @@ -136,19 +138,29 @@ namespace JiShe.CollectBus.Subscribers public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) + try { - _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); - + _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); } + else + { + // var dd = await _meterReadingRecordsRepository.FirstOrDefaultAsync(d=>d.ManualOrNot== true); + + var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress); + if (device != null) + { + await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); + + } + } + } + catch (Exception ex) + { + + throw ex; } } #endregion diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index eb191ca..fa21071 100644 --- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -136,6 +136,11 @@ namespace JiShe.CollectBus.Ammeters /// public int ProjectID { get; set; } + /// + /// 数据库业务ID + /// + public int DatabaseBusiID { get; set; } + /// /// 是否异常集中器 0:正常,1异常 /// diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs index 9f854df..a75c032 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs @@ -55,7 +55,6 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// public string MeterAddress { get; set; } - /// /// 表类型 /// @@ -122,6 +121,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// public string DataResult { get; set; } + /// + /// 数据时间,如冻结时间、事件发生事件等 + /// + public DateTime? DataGenerationTimestamp { get; set; } + /// /// 数据迁移时间 /// diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 1283dfc..933641b 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -25,6 +25,7 @@ namespace JiShe.CollectBus.Host typeof(AbpAspNetCoreSerilogModule), typeof(AbpSwashbuckleModule), typeof(CollectBusApplicationModule), + typeof(CollectBusMongoDbModule), typeof(AbpCachingStackExchangeRedisModule), typeof(AbpBackgroundWorkersHangfireModule) )] diff --git a/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj index 01507cc..0b178ee 100644 --- a/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj +++ b/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj @@ -1,4 +1,4 @@ - + net8.0 @@ -55,6 +55,7 @@ + diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs index d17a0d2..7940a79 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs @@ -31,45 +31,59 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon protected override void CreateModel(IMongoModelBuilder modelBuilder) { - base.CreateModel(modelBuilder); - modelBuilder.ConfigureCollectBus(); - modelBuilder.Entity(builder => { - // 创建索引 - builder.ConfigureIndexes(index => + builder.CreateCollectionOptions.Collation = new Collation(locale: "en_US", strength: CollationStrength.Secondary); + builder.ConfigureIndexes(indexes => { - //List> createIndexModels = new List>(); - //createIndexModels.Add(new CreateIndexModel( - // Builders.IndexKeys.Ascending(nameof(MeterReadingRecords)), - // new CreateIndexOptions - // { - // Unique = true - // } - // )); + indexes.CreateOne( + new CreateIndexModel( + Builders.IndexKeys.Ascending("MyProperty"), + new CreateIndexOptions { Unique = true } + ) + ); + } + ); + + //// 创建索引 + //builder.ConfigureIndexes(index => + //{ - //var indexKeys = Builders.IndexKeys - //.Ascending("CreationTime") - //.Ascending("OrderNumber"); + // //List> createIndexModels = new List>(); + // //createIndexModels.Add(new CreateIndexModel( + // // Builders.IndexKeys.Ascending(nameof(MeterReadingRecords)), + // // new CreateIndexOptions + // // { + // // Unique = true + // // } + // // )); - //var indexOptions = new CreateIndexOptions - //{ - // Background = true, - // Name = "IX_CreationTime_OrderNumber" - //}; - //index.CreateOne( - //new CreateIndexModel(indexKeys, indexOptions)); - index.CreateOne(new CreateIndexModel( - Builders.IndexKeys.Ascending(nameof(MeterReadingRecords)), - new CreateIndexOptions - { - Unique = true - } - )); - }); + // //var indexKeys = Builders.IndexKeys + // //.Ascending("CreationTime") + // //.Ascending("OrderNumber"); + + // //var indexOptions = new CreateIndexOptions + // //{ + // // Background = true, + // // Name = "IX_CreationTime_OrderNumber" + // //}; + // //index.CreateOne( + // //new CreateIndexModel(indexKeys, indexOptions)); + + // //index.CreateOne(new CreateIndexModel( + // // Builders.IndexKeys.Ascending(nameof(MeterReadingRecords)), + // // new CreateIndexOptions + // // { + // // Unique = true + // // } + // // )); + //}); }); + + base.CreateModel(modelBuilder); + modelBuilder.ConfigureCollectBus(); } } diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs index d3d9559..9a5a0f0 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs @@ -35,8 +35,8 @@ public class CollectBusMongoDbModule : AbpModule typeof(IShardingStrategy<>), typeof(DayShardingStrategy<>)); - // 分表策略仓储 替换默认仓储 - options.AddRepository(); + //// 分表策略仓储 替换默认仓储 + //options.AddRepository(); }); context.Services.AddAlwaysDisableUnitOfWorkTransaction(); diff --git a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs index 788dbb3..20b7809 100644 --- a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs +++ b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/IMeterReadingRecordRepository.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using MongoDB.Driver; using System; using System.Collections.Generic; using System.Linq; @@ -31,6 +32,23 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord /// Task InsertAsync(MeterReadingRecords entity, DateTime? dateTime); + /// + /// 单条更新 + /// + /// 过滤条件,示例:Builders.Filter.Eq(x => x.Id, filter.Id) + /// 包含待更新的内容,示例:Builders.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now) + /// 数据实体,用于获取对应的分片库 + /// + Task UpdateOneAsync(FilterDefinition filter, UpdateDefinition update, MeterReadingRecords entity); + + /// + /// 单个获取 + /// + /// + /// + /// + Task FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime); + /// /// 多集合数据查询 /// diff --git a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs index 14e67d7..5a73b8d 100644 --- a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs +++ b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs @@ -10,8 +10,11 @@ using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Repositories.MongoDB; using Volo.Abp.MongoDB; +using Volo.Abp.MongoDB.DistributedEvents; +using Volo.Abp.Timing; using static System.Net.Mime.MediaTypeNames; namespace JiShe.CollectBus.Repository.MeterReadingRecord @@ -85,7 +88,24 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord public async Task InsertAsync(MeterReadingRecords entity, DateTime? dateTime) { var collection = await GetShardedCollection(dateTime); - await collection.InsertOneAsync(entity); + await collection.InsertOneAsync(entity); + return entity; + } + + /// + /// 单条更新 + /// + /// 过滤条件,示例:Builders.Filter.Eq(x => x.Id, filter.Id) + /// 包含待更新的内容,示例:Builders.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now) + /// 数据实体,用于获取对应的分片库 + /// + public async Task UpdateOneAsync(FilterDefinition filter, UpdateDefinition update, MeterReadingRecords entity) + { + var collection = await GetShardedCollection(entity.CreationTime); + + var dbContext = await DbContextProvider.GetDbContextAsync(); + + await collection.UpdateOneAsync(filter, update); return entity; } @@ -135,5 +155,10 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord return database.GetCollection(collectionName); } + + public Task FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime) + { + throw new NotImplementedException(); + } } }