解决IoTDB写入速度过慢的问题

This commit is contained in:
ChenYi 2025-04-28 16:37:31 +08:00
parent 021153a319
commit d18f60f9a7
14 changed files with 127 additions and 97 deletions

View File

@ -11,5 +11,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.IncrementalGenerator\JiShe.CollectBus.IncrementalGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,9 +1,10 @@
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.IoTDB.Attribute;
namespace JiShe.CollectBus.IoTDB.Model namespace JiShe.CollectBus.IoTDB.Model
{ {
/// <summary> /// <summary>
/// IoT实体基类此类适用于多个数据测点记录场景单个测点请使用子类 SingleMeasuring /// IoT实体基类此类适用于多个数据测点记录场景单个测点请使用子类 SingleMeasuring,新增字段只能现有字段末尾添加,否则会导致数据写入失败。
/// </summary> /// </summary>
public abstract class IoTEntity public abstract class IoTEntity
{ {

View File

@ -26,7 +26,7 @@
/// <summary> /// <summary>
/// 连接池大小 /// 连接池大小
/// </summary> /// </summary>
public int PoolSize { get; set; } = 2; public int PoolSize { get; set; } = 8;
/// <summary> /// <summary>
/// 查询时每次查询的数据量默认1024 /// 查询时每次查询的数据量默认1024

View File

@ -70,7 +70,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
var result = await _sessionPool.InsertAlignedTabletAsync(tablet); var result = await _sessionPool.InsertAlignedTabletAsync(tablet);
if (result != 0) if (result != 0)
{ {
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}"); throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}请检查IoTEntity继承子类属性索引是否有变动。");
} }
//await CloseAsync(); //await CloseAsync();
return result; return result;

View File

@ -68,7 +68,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
var result = await _sessionPool.InsertAsync(tablet); var result = await _sessionPool.InsertAsync(tablet);
if (result != 0) if (result != 0)
{ {
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}"); throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}请检查IoTEntity继承子类属性索引是否有变动。");
} }
//await CloseAsync(); //await CloseAsync();

View File

@ -66,12 +66,17 @@ public class CollectBusApplicationModule : AbpModule
.ToList(); .ToList();
foreach (var type in types) await context.AddBackgroundWorkerAsync(type); foreach (var type in types) await context.AddBackgroundWorkerAsync(type);
Task.Run(() => //Task.Run(() =>
{ //{
//默认初始化表计信息 // //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); // var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
dbContext.InitAmmeterCacheData(); // dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData(); // //await dbContext.InitWatermeterCacheData();
}).ConfigureAwait(false); //}).ConfigureAwait(false);
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
_= dbContext.InitAmmeterCacheData();
} }
} }

View File

@ -31,7 +31,6 @@
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.IncrementalGenerator\JiShe.CollectBus.IncrementalGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -286,52 +286,51 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
//此处代码不要删除 //此处代码不要删除
//#if DEBUG #if DEBUG
// var timeDensity = "15"; var timeDensity = "15";
// var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
// List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
// List<string> focusAddressDataLista = new List<string>(); List<string> focusAddressDataLista = new List<string>();
// var timer1 = Stopwatch.StartNew(); var timer1 = Stopwatch.StartNew();
// var allIds = new HashSet<string>(); var allIds = new HashSet<string>();
// decimal? score = null; decimal? score = null;
// string member = null; string member = null;
// while (true) while (true)
// { {
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000, pageSize: 1000,
// lastScore: score, lastScore: score,
// lastMember: member); lastMember: member);
// meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
// focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress)); focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
// foreach (var item in page.Items) foreach (var item in page.Items)
// { {
// if (!allIds.Add(item.MemberId)) if (!allIds.Add(item.MemberId))
// { {
// _logger.LogError($"{item.MemberId}Duplicate data found!"); _logger.LogError($"{item.MemberId}Duplicate data found!");
// } }
// } }
// if (!page.HasNext) break; if (!page.HasNext) break;
// score = page.NextScore; score = page.NextScore;
// member = page.NextMember; member = page.NextMember;
// } }
// timer1.Stop(); timer1.Stop();
// _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
// return; return;
//#else #else
// var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
//#endif #endif
var meterInfos = await GetAmmeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");

View File

@ -35,6 +35,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"/energy/app/scheduled")] //[Route($"/energy/app/scheduled")]
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{ {
string systemType = string.Empty;
string serverTagName = string.Empty; string serverTagName = string.Empty;
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IIoTDbProvider _dbProvider; private readonly IIoTDbProvider _dbProvider;
@ -59,12 +60,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
applicationOptions) applicationOptions)
{ {
serverTagName = applicationOptions.Value.ServerTagName; serverTagName = applicationOptions.Value.ServerTagName;
systemType = applicationOptions.Value.SystemType;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_logger = logger; _logger = logger;
_protocolService = protocolService; _protocolService = protocolService;
} }
public sealed override string SystemType => SystemTypeConst.Energy; public sealed override string SystemType => systemType;
public sealed override string ServerTagName => serverTagName; public sealed override string ServerTagName => serverTagName;

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common; using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Enums; using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
@ -89,6 +90,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// <summary> /// <summary>
/// 抄读计量点 /// 抄读计量点
/// </summary> /// </summary>
[FIELDColumn]
public int Pn { get; set; } public int Pn { get; set; }
/// <summary> /// <summary>
@ -107,6 +109,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// <summary> /// <summary>
/// 帧序列域 SEQ /// 帧序列域 SEQ
/// </summary> /// </summary>
[FIELDColumn]
public int Seq { get; set; } public int Seq { get; set; }
/// <summary> /// <summary>
@ -125,7 +128,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 发送次数 /// 发送次数
/// </summary> /// </summary>
[FIELDColumn] [FIELDColumn]
public int SendNum { get; set; } public int? SendNum { get; set; }
/// <summary> /// <summary>
/// 下次发送时间 /// 下次发送时间

View File

@ -23,8 +23,10 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.IoTDB\JiShe.CollectBus.IoTDB.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.IoTDB\JiShe.CollectBus.IoTDB.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.IncrementalGenerator\JiShe.CollectBus.IncrementalGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -20,13 +20,14 @@ namespace JiShe.CollectBus.IncrementalGenerator
public void Initialize(IncrementalGeneratorInitializationContext context) public void Initialize(IncrementalGeneratorInitializationContext context)
{ {
Debugger.Launch(); //Debugger.Launch();
context.RegisterPostInitializationOutput(ctx => context.RegisterPostInitializationOutput(ctx =>
{ {
ctx.AddSource("DebugInit.g.cs", "// Generator initialized"); ctx.AddSource("DebugInit.g.cs", "// Generator initialized");
}); });
// 步骤1筛选带有 [GenerateAccessors] 的类 // 步骤1筛选带有 [GenerateAccessors] 的类
var classDeclarations = context.SyntaxProvider var classDeclarations = context.SyntaxProvider
.CreateSyntaxProvider( .CreateSyntaxProvider(
@ -43,28 +44,6 @@ namespace JiShe.CollectBus.IncrementalGenerator
private static bool IsClassWithAttribute(SyntaxNode node) => node is ClassDeclarationSyntax cds && cds.AttributeLists.Count > 0; private static bool IsClassWithAttribute(SyntaxNode node) => node is ClassDeclarationSyntax cds && cds.AttributeLists.Count > 0;
//private static ClassDeclarationSyntax? GetClassDeclaration(GeneratorSyntaxContext context)
//{
// var classDecl = (ClassDeclarationSyntax)context.Node;
// foreach (var attributeList in classDecl.AttributeLists)
// {
// foreach (var attribute in attributeList.Attributes)
// {
// var symbol = context.SemanticModel.GetSymbolInfo(attribute).Symbol;
// if (symbol is IMethodSymbol ctor)
// {
// string sdd = ctor.ContainingType.Name;
// var attributeType = context.SemanticModel.Compilation.GetTypeByMetadataName(AttributeFullName);
// if (ctor.ContainingType?.OriginalDefinition?.Equals(attributeType, SymbolEqualityComparer.Default) == true)
// {
// return classDecl;
// }
// }
// }
// }
// return null;
//}
private static ClassDeclarationSyntax? GetClassDeclaration(GeneratorSyntaxContext context) private static ClassDeclarationSyntax? GetClassDeclaration(GeneratorSyntaxContext context)
{ {
@ -75,7 +54,7 @@ namespace JiShe.CollectBus.IncrementalGenerator
{ {
var symbol = context.SemanticModel.GetSymbolInfo(attribute).Symbol; var symbol = context.SemanticModel.GetSymbolInfo(attribute).Symbol;
if (symbol is IMethodSymbol ctor && if (symbol is IMethodSymbol ctor &&
ctor.ContainingType?.OriginalDefinition?.Equals(attributeType, SymbolEqualityComparer.Default) == true) SymbolEqualityComparer.Default.Equals(ctor.ContainingType, attributeType))
{ {
return classDecl; return classDecl;
} }
@ -90,13 +69,27 @@ namespace JiShe.CollectBus.IncrementalGenerator
{ {
var processedTypes = new HashSet<ITypeSymbol>(SymbolEqualityComparer.Default); var processedTypes = new HashSet<ITypeSymbol>(SymbolEqualityComparer.Default);
if (!classes.Any())
{
context.ReportDiagnostic(Diagnostic.Create(
new DiagnosticDescriptor("GEN002", "No Targets",
"No classes with [GenerateAccessors] found", "Debug", DiagnosticSeverity.Warning, true),
Location.None));
}
foreach (var classDecl in classes.Distinct()) foreach (var classDecl in classes.Distinct())
{ {
var model = compilation.GetSemanticModel(classDecl.SyntaxTree); var model = compilation.GetSemanticModel(classDecl.SyntaxTree);
var classSymbol = model.GetDeclaredSymbol(classDecl) as INamedTypeSymbol; var classSymbol = model.GetDeclaredSymbol(classDecl) as INamedTypeSymbol;
if (classSymbol == null || !processedTypes.Add(classSymbol)) continue; if (classSymbol == null || !processedTypes.Add(classSymbol))
{
context.ReportDiagnostic(Diagnostic.Create(
new DiagnosticDescriptor("GEN003", "Invalid Symbol",
$"Class symbol is null for {classDecl.Identifier.Text}", "Error", DiagnosticSeverity.Error, true),
Location.None));
continue;
}
context.ReportDiagnostic(Diagnostic.Create( context.ReportDiagnostic(Diagnostic.Create(
new DiagnosticDescriptor( new DiagnosticDescriptor(
@ -114,6 +107,9 @@ namespace JiShe.CollectBus.IncrementalGenerator
Location.None)); Location.None));
var code = BuildAccessorsForType(classSymbol, compilation, processedTypes); var code = BuildAccessorsForType(classSymbol, compilation, processedTypes);
System.Diagnostics.Debug.WriteLine($"Generated code for {classSymbol.Name}:\n{code}"); // 调试输出
context.AddSource($"{classSymbol.Name}Extension.g.cs", SourceText.From(code, Encoding.UTF8)); context.AddSource($"{classSymbol.Name}Extension.g.cs", SourceText.From(code, Encoding.UTF8));
} }
} }
@ -124,6 +120,8 @@ namespace JiShe.CollectBus.IncrementalGenerator
HashSet<ITypeSymbol> processedTypes) HashSet<ITypeSymbol> processedTypes)
{ {
var code = new StringBuilder(); var code = new StringBuilder();
code.AppendLine("#pragma warning disable CS0419 // 禁用警告");
code.AppendLine("// Generated code for " + classSymbol.Name);
code.AppendLine("// <auto-generated/>"); code.AppendLine("// <auto-generated/>");
code.AppendLine("#nullable enable"); code.AppendLine("#nullable enable");
code.AppendLine($"namespace {classSymbol.ContainingNamespace.ToDisplayString()};"); code.AppendLine($"namespace {classSymbol.ContainingNamespace.ToDisplayString()};");
@ -143,8 +141,14 @@ namespace JiShe.CollectBus.IncrementalGenerator
return code.ToString(); return code.ToString();
} }
private static string GetGenericParams(INamedTypeSymbol symbol) //private static string GetGenericParams(INamedTypeSymbol symbol)
=> symbol.IsGenericType ? $"<{string.Join(", ", symbol.TypeParameters.Select(t => t.Name))}>" : ""; // => symbol.IsGenericType ? $"<{string.Join(", ", symbol.TypeParameters.Select(t => t.Name))}>" : "";
public static string GetGenericParams(INamedTypeSymbol symbol)
{
if (!symbol.IsGenericType) return "";
var parameters = symbol.TypeParameters.Select(t => t.Name);
return $"<{string.Join(", ", parameters)}>";
}
private static void GeneratePropertyAccessors( private static void GeneratePropertyAccessors(
IPropertySymbol prop, IPropertySymbol prop,

View File

@ -1,14 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<Nullable>enable</Nullable> <TargetFramework>net8.0</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework> <IsRoslynComponent>true</IsRoslynComponent>
<!--<TargetFramework>net8.0</TargetFramework>--> <NoPackageAnalysis>true</NoPackageAnalysis>
<EnforceExtendedAnalyzerRules>true</EnforceExtendedAnalyzerRules> <ImportDirectoryBuildProps>false</ImportDirectoryBuildProps>
<BaseOutputPath>bin</BaseOutputPath>
<IncludeBuildOutput>false</IncludeBuildOutput>
<LangVersion>latest</LangVersion> <LangVersion>latest</LangVersion>
<EnforceExtendedAnalyzerRules>true</EnforceExtendedAnalyzerRules>
<EmitCompilerGeneratedFiles>true</EmitCompilerGeneratedFiles>
<!-- 指定生成文件的输出目录 -->
<CompilerGeneratedFilesOutputPath>Generated</CompilerGeneratedFilesOutputPath>
<Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.Analyzers" Version="3.11.0" PrivateAssets="all" /> <PackageReference Include="Microsoft.CodeAnalysis.Analyzers" Version="3.11.0" PrivateAssets="all" />
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.13.0" PrivateAssets="all" /> <PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.13.0" PrivateAssets="all" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" >
<ReferenceOutputAssembly>true</ReferenceOutputAssembly>
<PrivateAssets>all</PrivateAssets>
</ProjectReference>
</ItemGroup>
</Project> </Project>

View File

@ -5,7 +5,7 @@
"Serilog.Sinks.File" "Serilog.Sinks.File"
], ],
"MinimumLevel": { "MinimumLevel": {
"Default": "Warning", "Default": "Information",
"Override": { "Override": {
"Microsoft": "Warning", "Microsoft": "Warning",
"Volo.Abp": "Warning", "Volo.Abp": "Warning",
@ -87,7 +87,7 @@
"UserName": "root", "UserName": "root",
"Password": "root", "Password": "root",
"ClusterList": [ "192.168.1.9:6667" ], "ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 2, "PoolSize": 32,
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
@ -142,9 +142,9 @@
} }
}, },
"ServerApplicationOptions": { "ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus100", "ServerTagName": "JiSheCollectBus4",
"SystemType": null, "SystemType": "Energy",
"FirstCollectionTime": "2025-04-22 16:07:00", "FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00", "AutomaticVerificationTime": "16:07:00",
"AutomaticTerminalVersionTime": "17:07:00", "AutomaticTerminalVersionTime": "17:07:00",
"AutomaticTelematicsModuleTime": "17:30:00", "AutomaticTelematicsModuleTime": "17:30:00",