优化代码
This commit is contained in:
parent
e7f94ceae4
commit
c88ce18f3a
@ -26,7 +26,6 @@
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@ -14,6 +14,7 @@ using JiShe.CollectBus.FreeRedisProvider;
|
||||
using JiShe.CollectBus.Workers;
|
||||
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||
using JiShe.CollectBus.MongoDB;
|
||||
using JiShe.CollectBus.ScheduledMeterReading;
|
||||
|
||||
namespace JiShe.CollectBus;
|
||||
|
||||
@ -23,7 +24,6 @@ namespace JiShe.CollectBus;
|
||||
typeof(AbpDddApplicationModule),
|
||||
typeof(AbpAutoMapperModule),
|
||||
typeof(AbpBackgroundWorkersHangfireModule),
|
||||
typeof(CollectBusMongoDbModule),
|
||||
typeof(CollectBusFreeRedisModule),
|
||||
typeof(CollectBusFreeSqlModule)
|
||||
)]
|
||||
@ -54,6 +54,11 @@ public class CollectBusApplicationModule : AbpModule
|
||||
{
|
||||
context.AddBackgroundWorkerAsync(type);
|
||||
}
|
||||
|
||||
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||
|
||||
//默认初始化表计信息
|
||||
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@
|
||||
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
|
||||
@ -189,10 +189,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
|
||||
|
||||
#if DEBUG
|
||||
//每次缓存时,删除缓存,避免缓存数据错误
|
||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||
#else
|
||||
//每次缓存时,删除缓存,避免缓存数据错误
|
||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||
#endif
|
||||
|
||||
@ -688,6 +688,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
|
||||
var meterReadingRecords = new MeterReadingRecords()
|
||||
{
|
||||
ProjectID = ammeter.ProjectID,
|
||||
DatabaseBusiID = ammeter.DatabaseBusiID,
|
||||
PendingCopyReadTime = pendingCopyReadTime,
|
||||
CreationTime = currentTime,
|
||||
MeterAddress = ammeter.AmmerterAddress,
|
||||
|
||||
@ -6,6 +6,7 @@ using JiShe.CollectBus.Ammeters;
|
||||
using JiShe.CollectBus.Common.Consts;
|
||||
using JiShe.CollectBus.FreeSql;
|
||||
using JiShe.CollectBus.GatherItem;
|
||||
using JiShe.CollectBus.IotSystems.Devices;
|
||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||
@ -62,7 +63,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
//[Route($"ammeter/list")]
|
||||
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
|
||||
{
|
||||
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress
|
||||
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
|
||||
FROM TB_GatherInfo(NOLOCK) AS A
|
||||
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
|
||||
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
|
||||
@ -114,7 +115,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
C.GatherCode,
|
||||
A.[ProjectID],
|
||||
B.AbnormalState,
|
||||
B.LastTime
|
||||
B.LastTime,
|
||||
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
|
||||
(select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID
|
||||
FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A
|
||||
INNER JOIN [dbo].[TB_FocusInfo](NOLOCK) AS B ON A.FocusID=B.ID AND B.RemoveState >= 0 AND B.State>=0
|
||||
INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID
|
||||
|
||||
@ -66,9 +66,11 @@ namespace JiShe.CollectBus.Subscribers
|
||||
{
|
||||
var currentDateTime = DateTime.Now;
|
||||
|
||||
var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10));
|
||||
var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10));
|
||||
|
||||
return list;
|
||||
|
||||
//return null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -136,19 +138,29 @@ namespace JiShe.CollectBus.Subscribers
|
||||
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||
{
|
||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||
if (protocolPlugin == null)
|
||||
try
|
||||
{
|
||||
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
||||
}
|
||||
else
|
||||
{
|
||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
||||
if (device != null)
|
||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||
if (protocolPlugin == null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
|
||||
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
||||
}
|
||||
else
|
||||
{
|
||||
// var dd = await _meterReadingRecordsRepository.FirstOrDefaultAsync(d=>d.ManualOrNot== true);
|
||||
|
||||
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
|
||||
if (device != null)
|
||||
{
|
||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
@ -136,6 +136,11 @@ namespace JiShe.CollectBus.Ammeters
|
||||
/// </summary>
|
||||
public int ProjectID { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 数据库业务ID
|
||||
/// </summary>
|
||||
public int DatabaseBusiID { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 是否异常集中器 0:正常,1异常
|
||||
/// </summary>
|
||||
|
||||
@ -55,7 +55,6 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
||||
/// </summary>
|
||||
public string MeterAddress { get; set; }
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 表类型
|
||||
/// </summary>
|
||||
@ -122,6 +121,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
||||
/// </summary>
|
||||
public string DataResult { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 数据时间,如冻结时间、事件发生事件等
|
||||
/// </summary>
|
||||
public DateTime? DataGenerationTimestamp { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 数据迁移时间
|
||||
/// </summary>
|
||||
|
||||
@ -25,6 +25,7 @@ namespace JiShe.CollectBus.Host
|
||||
typeof(AbpAspNetCoreSerilogModule),
|
||||
typeof(AbpSwashbuckleModule),
|
||||
typeof(CollectBusApplicationModule),
|
||||
typeof(CollectBusMongoDbModule),
|
||||
typeof(AbpCachingStackExchangeRedisModule),
|
||||
typeof(AbpBackgroundWorkersHangfireModule)
|
||||
)]
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk.Web">
|
||||
<Project Sdk="Microsoft.NET.Sdk.Web">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
@ -55,6 +55,7 @@
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@ -34,42 +34,42 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
|
||||
base.CreateModel(modelBuilder);
|
||||
modelBuilder.ConfigureCollectBus();
|
||||
|
||||
modelBuilder.Entity<MeterReadingRecords>(builder =>
|
||||
{
|
||||
// 创建索引
|
||||
builder.ConfigureIndexes(index =>
|
||||
{
|
||||
//List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>();
|
||||
//createIndexModels.Add(new CreateIndexModel<BsonDocument>(
|
||||
// Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
||||
// new CreateIndexOptions
|
||||
// {
|
||||
// Unique = true
|
||||
// }
|
||||
// ));
|
||||
//modelBuilder.Entity<MeterReadingRecords>(builder =>
|
||||
//{
|
||||
// // 创建索引
|
||||
// builder.ConfigureIndexes(index =>
|
||||
// {
|
||||
// //List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>();
|
||||
// //createIndexModels.Add(new CreateIndexModel<BsonDocument>(
|
||||
// // Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
||||
// // new CreateIndexOptions
|
||||
// // {
|
||||
// // Unique = true
|
||||
// // }
|
||||
// // ));
|
||||
|
||||
|
||||
//var indexKeys = Builders<BsonDocument>.IndexKeys
|
||||
//.Ascending("CreationTime")
|
||||
//.Ascending("OrderNumber");
|
||||
// //var indexKeys = Builders<BsonDocument>.IndexKeys
|
||||
// //.Ascending("CreationTime")
|
||||
// //.Ascending("OrderNumber");
|
||||
|
||||
//var indexOptions = new CreateIndexOptions
|
||||
//{
|
||||
// Background = true,
|
||||
// Name = "IX_CreationTime_OrderNumber"
|
||||
//};
|
||||
//index.CreateOne(
|
||||
//new CreateIndexModel<BsonDocument>(indexKeys, indexOptions));
|
||||
// //var indexOptions = new CreateIndexOptions
|
||||
// //{
|
||||
// // Background = true,
|
||||
// // Name = "IX_CreationTime_OrderNumber"
|
||||
// //};
|
||||
// //index.CreateOne(
|
||||
// //new CreateIndexModel<BsonDocument>(indexKeys, indexOptions));
|
||||
|
||||
index.CreateOne(new CreateIndexModel<BsonDocument>(
|
||||
Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
||||
new CreateIndexOptions
|
||||
{
|
||||
Unique = true
|
||||
}
|
||||
));
|
||||
});
|
||||
// index.CreateOne(new CreateIndexModel<BsonDocument>(
|
||||
// Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
|
||||
// new CreateIndexOptions
|
||||
// {
|
||||
// Unique = true
|
||||
// }
|
||||
// ));
|
||||
// });
|
||||
|
||||
});
|
||||
//});
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,8 +35,8 @@ public class CollectBusMongoDbModule : AbpModule
|
||||
typeof(IShardingStrategy<>),
|
||||
typeof(DayShardingStrategy<>));
|
||||
|
||||
// 分表策略仓储 替换默认仓储
|
||||
options.AddRepository<MeterReadingRecords, MeterReadingRecordRepository>();
|
||||
//// 分表策略仓储 替换默认仓储
|
||||
//options.AddRepository<MeterReadingRecords, MeterReadingRecordRepository>();
|
||||
});
|
||||
|
||||
context.Services.AddAlwaysDisableUnitOfWorkTransaction();
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using MongoDB.Driver;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
@ -31,6 +32,23 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
||||
/// <returns></returns>
|
||||
Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime);
|
||||
|
||||
/// <summary>
|
||||
/// 单条更新
|
||||
/// </summary>
|
||||
/// <param name="filter">过滤条件,示例:Builders<MeterReadingRecords>.Filter.Eq(x => x.Id, filter.Id)</param>
|
||||
/// <param name="update">包含待更新的内容,示例:Builders<MeterReadingRecords>.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now)</param>
|
||||
/// <param name="entity">数据实体,用于获取对应的分片库</param>
|
||||
/// <returns></returns>
|
||||
Task<MeterReadingRecords> UpdateOneAsync(FilterDefinition<MeterReadingRecords> filter, UpdateDefinition<MeterReadingRecords> update, MeterReadingRecords entity);
|
||||
|
||||
/// <summary>
|
||||
/// 单个获取
|
||||
/// </summary>
|
||||
/// <param name="entity"></param>
|
||||
/// <param name="dateTime"></param>
|
||||
/// <returns></returns>
|
||||
Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime);
|
||||
|
||||
/// <summary>
|
||||
/// 多集合数据查询
|
||||
/// </summary>
|
||||
|
||||
@ -10,8 +10,11 @@ using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Volo.Abp.Domain.Entities;
|
||||
using Volo.Abp.Domain.Repositories.MongoDB;
|
||||
using Volo.Abp.MongoDB;
|
||||
using Volo.Abp.MongoDB.DistributedEvents;
|
||||
using Volo.Abp.Timing;
|
||||
using static System.Net.Mime.MediaTypeNames;
|
||||
|
||||
namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
||||
@ -89,6 +92,23 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
||||
return entity;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 单条更新
|
||||
/// </summary>
|
||||
/// <param name="filter">过滤条件,示例:Builders<MeterReadingRecords>.Filter.Eq(x => x.Id, filter.Id)</param>
|
||||
/// <param name="update">包含待更新的内容,示例:Builders<MeterReadingRecords>.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now)</param>
|
||||
/// <param name="entity">数据实体,用于获取对应的分片库</param>
|
||||
/// <returns></returns>
|
||||
public async Task<MeterReadingRecords> UpdateOneAsync(FilterDefinition<MeterReadingRecords> filter, UpdateDefinition<MeterReadingRecords> update, MeterReadingRecords entity)
|
||||
{
|
||||
var collection = await GetShardedCollection(entity.CreationTime);
|
||||
|
||||
var dbContext = await DbContextProvider.GetDbContextAsync();
|
||||
|
||||
await collection.UpdateOneAsync(filter, update);
|
||||
return entity;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 多集合数据查询
|
||||
/// </summary>
|
||||
@ -135,5 +155,10 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
||||
|
||||
return database.GetCollection<MeterReadingRecords>(collectionName);
|
||||
}
|
||||
|
||||
public Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user