This commit is contained in:
cli 2025-03-21 14:33:07 +08:00
commit d0e2a35f22
14 changed files with 144 additions and 58 deletions

View File

@ -26,7 +26,6 @@
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -15,6 +15,7 @@ using JiShe.CollectBus.FreeRedisProvider;
using JiShe.CollectBus.Workers; using JiShe.CollectBus.Workers;
using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.BackgroundWorkers.Hangfire;
using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.MongoDB;
using JiShe.CollectBus.ScheduledMeterReading;
using AutoMapper.Configuration.Annotations; using AutoMapper.Configuration.Annotations;
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
@ -26,7 +27,6 @@ namespace JiShe.CollectBus;
typeof(AbpDddApplicationModule), typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule), typeof(AbpAutoMapperModule),
typeof(AbpBackgroundWorkersHangfireModule), typeof(AbpBackgroundWorkersHangfireModule),
typeof(CollectBusMongoDbModule),
typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule) typeof(CollectBusFreeSqlModule)
)] )]
@ -53,13 +53,14 @@ public class CollectBusApplicationModule : AbpModule
var assembly = Assembly.GetExecutingAssembly(); var assembly = Assembly.GetExecutingAssembly();
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList(); var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
foreach (var type in types) foreach (var type in types)
{
var ignoreJob = type.GetCustomAttribute<IgnoreJobAttribute>();
if (ignoreJob == null)
{ {
context.AddBackgroundWorkerAsync(type); context.AddBackgroundWorkerAsync(type);
} }
}
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
//默认初始化表计信息
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
} }
} }

View File

@ -26,6 +26,7 @@
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -189,10 +189,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG #if DEBUG
//每次缓存时,删除缓存,避免缓存数据错误 //每次缓存时,删除缓存,避免缓存数据有不准确的问题
await FreeRedisProvider.Instance.DelAsync(redisCacheKey); await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else #else
//每次缓存时,删除缓存,避免缓存数据错误 //每次缓存时,删除缓存,避免缓存数据有不准确的问题
await FreeRedisProvider.Instance.DelAsync(redisCacheKey); await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#endif #endif
@ -688,6 +688,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterReadingRecords = new MeterReadingRecords() var meterReadingRecords = new MeterReadingRecords()
{ {
ProjectID = ammeter.ProjectID,
DatabaseBusiID = ammeter.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime, PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime, CreationTime = currentTime,
MeterAddress = ammeter.AmmerterAddress, MeterAddress = ammeter.AmmerterAddress,

View File

@ -6,6 +6,7 @@ using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
@ -62,7 +63,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"ammeter/list")] //[Route($"ammeter/list")]
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{ {
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
@ -114,7 +115,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
C.GatherCode, C.GatherCode,
A.[ProjectID], A.[ProjectID],
B.AbnormalState, B.AbnormalState,
B.LastTime B.LastTime,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID
FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A
INNER JOIN [dbo].[TB_FocusInfo](NOLOCK) AS B ON A.FocusID=B.ID AND B.RemoveState >= 0 AND B.State>=0 INNER JOIN [dbo].[TB_FocusInfo](NOLOCK) AS B ON A.FocusID=B.ID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID

View File

@ -69,6 +69,8 @@ namespace JiShe.CollectBus.Subscribers
var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10)); var list = await _meterReadingRecordsRepository.ParallelQueryAsync(currentDateTime.AddMinutes(-20), currentDateTime.AddMinutes(10));
return list; return list;
//return null;
} }
/// <summary> /// <summary>
@ -136,6 +138,8 @@ namespace JiShe.CollectBus.Subscribers
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{ {
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
try
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
@ -143,7 +147,9 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); // var dd = await _meterReadingRecordsRepository.FirstOrDefaultAsync(d=>d.ManualOrNot== true);
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null) if (device != null)
{ {
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
@ -151,6 +157,12 @@ namespace JiShe.CollectBus.Subscribers
} }
} }
} }
catch (Exception ex)
{
throw ex;
}
}
#endregion #endregion
#region #region

View File

@ -136,6 +136,11 @@ namespace JiShe.CollectBus.Ammeters
/// </summary> /// </summary>
public int ProjectID { get; set; } public int ProjectID { get; set; }
/// <summary>
/// 数据库业务ID
/// </summary>
public int DatabaseBusiID { get; set; }
/// <summary> /// <summary>
/// 是否异常集中器 0:正常1异常 /// 是否异常集中器 0:正常1异常
/// </summary> /// </summary>

View File

@ -55,7 +55,6 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// </summary> /// </summary>
public string MeterAddress { get; set; } public string MeterAddress { get; set; }
/// <summary> /// <summary>
/// 表类型 /// 表类型
/// </summary> /// </summary>
@ -122,6 +121,11 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// </summary> /// </summary>
public string DataResult { get; set; } public string DataResult { get; set; }
/// <summary>
/// 数据时间,如冻结时间、事件发生事件等
/// </summary>
public DateTime? DataGenerationTimestamp { get; set; }
/// <summary> /// <summary>
/// 数据迁移时间 /// 数据迁移时间
/// </summary> /// </summary>

View File

@ -25,6 +25,7 @@ namespace JiShe.CollectBus.Host
typeof(AbpAspNetCoreSerilogModule), typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule), typeof(AbpSwashbuckleModule),
typeof(CollectBusApplicationModule), typeof(CollectBusApplicationModule),
typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule), typeof(AbpCachingStackExchangeRedisModule),
typeof(AbpBackgroundWorkersHangfireModule) typeof(AbpBackgroundWorkersHangfireModule)
)] )]

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk.Web"> <Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup> <PropertyGroup>
<TargetFramework>net8.0</TargetFramework> <TargetFramework>net8.0</TargetFramework>
@ -55,6 +55,7 @@
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -31,45 +31,59 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
protected override void CreateModel(IMongoModelBuilder modelBuilder) protected override void CreateModel(IMongoModelBuilder modelBuilder)
{ {
base.CreateModel(modelBuilder);
modelBuilder.ConfigureCollectBus();
modelBuilder.Entity<MeterReadingRecords>(builder => modelBuilder.Entity<MeterReadingRecords>(builder =>
{ {
// 创建索引 builder.CreateCollectionOptions.Collation = new Collation(locale: "en_US", strength: CollationStrength.Secondary);
builder.ConfigureIndexes(index => builder.ConfigureIndexes(indexes =>
{ {
//List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>(); indexes.CreateOne(
//createIndexModels.Add(new CreateIndexModel<BsonDocument>( new CreateIndexModel<BsonDocument>(
// Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)), Builders<BsonDocument>.IndexKeys.Ascending("MyProperty"),
// new CreateIndexOptions new CreateIndexOptions { Unique = true }
// { )
// Unique = true );
// }
// ));
//var indexKeys = Builders<BsonDocument>.IndexKeys
//.Ascending("CreationTime")
//.Ascending("OrderNumber");
//var indexOptions = new CreateIndexOptions
//{
// Background = true,
// Name = "IX_CreationTime_OrderNumber"
//};
//index.CreateOne(
//new CreateIndexModel<BsonDocument>(indexKeys, indexOptions));
index.CreateOne(new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
new CreateIndexOptions
{
Unique = true
} }
)); );
//// 创建索引
//builder.ConfigureIndexes(index =>
//{
// //List<CreateIndexModel<BsonDocument>> createIndexModels = new List<CreateIndexModel<BsonDocument>>();
// //createIndexModels.Add(new CreateIndexModel<BsonDocument>(
// // Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
// // new CreateIndexOptions
// // {
// // Unique = true
// // }
// // ));
// //var indexKeys = Builders<BsonDocument>.IndexKeys
// //.Ascending("CreationTime")
// //.Ascending("OrderNumber");
// //var indexOptions = new CreateIndexOptions
// //{
// // Background = true,
// // Name = "IX_CreationTime_OrderNumber"
// //};
// //index.CreateOne(
// //new CreateIndexModel<BsonDocument>(indexKeys, indexOptions));
// //index.CreateOne(new CreateIndexModel<BsonDocument>(
// // Builders<BsonDocument>.IndexKeys.Ascending(nameof(MeterReadingRecords)),
// // new CreateIndexOptions
// // {
// // Unique = true
// // }
// // ));
//});
}); });
}); base.CreateModel(modelBuilder);
modelBuilder.ConfigureCollectBus();
} }
} }

View File

@ -35,8 +35,8 @@ public class CollectBusMongoDbModule : AbpModule
typeof(IShardingStrategy<>), typeof(IShardingStrategy<>),
typeof(DayShardingStrategy<>)); typeof(DayShardingStrategy<>));
// 分表策略仓储 替换默认仓储 //// 分表策略仓储 替换默认仓储
options.AddRepository<MeterReadingRecords, MeterReadingRecordRepository>(); //options.AddRepository<MeterReadingRecords, MeterReadingRecordRepository>();
}); });
context.Services.AddAlwaysDisableUnitOfWorkTransaction(); context.Services.AddAlwaysDisableUnitOfWorkTransaction();

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using MongoDB.Driver;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -31,6 +32,23 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
/// <returns></returns> /// <returns></returns>
Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime); Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime);
/// <summary>
/// 单条更新
/// </summary>
/// <param name="filter">过滤条件示例Builders<MeterReadingRecords>.Filter.Eq(x => x.Id, filter.Id)</param>
/// <param name="update">包含待更新的内容示例Builders<MeterReadingRecords>.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now)</param>
/// <param name="entity">数据实体,用于获取对应的分片库</param>
/// <returns></returns>
Task<MeterReadingRecords> UpdateOneAsync(FilterDefinition<MeterReadingRecords> filter, UpdateDefinition<MeterReadingRecords> update, MeterReadingRecords entity);
/// <summary>
/// 单个获取
/// </summary>
/// <param name="entity"></param>
/// <param name="dateTime"></param>
/// <returns></returns>
Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime);
/// <summary> /// <summary>
/// 多集合数据查询 /// 多集合数据查询
/// </summary> /// </summary>

View File

@ -10,8 +10,11 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories.MongoDB; using Volo.Abp.Domain.Repositories.MongoDB;
using Volo.Abp.MongoDB; using Volo.Abp.MongoDB;
using Volo.Abp.MongoDB.DistributedEvents;
using Volo.Abp.Timing;
using static System.Net.Mime.MediaTypeNames; using static System.Net.Mime.MediaTypeNames;
namespace JiShe.CollectBus.Repository.MeterReadingRecord namespace JiShe.CollectBus.Repository.MeterReadingRecord
@ -89,6 +92,23 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
return entity; return entity;
} }
/// <summary>
/// 单条更新
/// </summary>
/// <param name="filter">过滤条件示例Builders<MeterReadingRecords>.Filter.Eq(x => x.Id, filter.Id)</param>
/// <param name="update">包含待更新的内容示例Builders<MeterReadingRecords>.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now)</param>
/// <param name="entity">数据实体,用于获取对应的分片库</param>
/// <returns></returns>
public async Task<MeterReadingRecords> UpdateOneAsync(FilterDefinition<MeterReadingRecords> filter, UpdateDefinition<MeterReadingRecords> update, MeterReadingRecords entity)
{
var collection = await GetShardedCollection(entity.CreationTime);
var dbContext = await DbContextProvider.GetDbContextAsync();
await collection.UpdateOneAsync(filter, update);
return entity;
}
/// <summary> /// <summary>
/// 多集合数据查询 /// 多集合数据查询
/// </summary> /// </summary>
@ -135,5 +155,10 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
return database.GetCollection<MeterReadingRecords>(collectionName); return database.GetCollection<MeterReadingRecords>(collectionName);
} }
public Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
{
throw new NotImplementedException();
}
} }
} }