Compare commits

..

12 Commits

Author SHA1 Message Date
ChenYi
bdfc8eabae 合并代码 2025-05-08 17:22:06 +08:00
ChenYi
fa593de754 完善IoTDB驱动,适配增量源码生成器 2025-05-08 17:21:20 +08:00
ChenYi
c03207aa21 优化增量源码生成器和IoTDB驱动 2025-05-08 14:42:13 +08:00
ChenYi
db7384ae74 完善增量源码生成器 2025-05-08 11:17:43 +08:00
ChenYi
edecbc386e 完善数据通道数据处理,解决数据丢失的问题。 2025-05-08 10:28:23 +08:00
ChenYi
ac226110cd 合并代码 2025-05-08 08:48:09 +08:00
ChenYi
f71ce3bacb 完善增量源码生成器 2025-05-08 08:43:37 +08:00
ChenYi
c47ee94469 修改代码 2025-05-07 17:27:37 +08:00
ChenYi
ff517664fe 复杂类型增量源生成器 2025-05-07 17:20:10 +08:00
ChenYi
6b012d9303 完善复杂类型增量源码生成器 2025-05-07 16:37:26 +08:00
ChenYi
bca7202558 优化复杂类型增量源生成器 2025-05-07 10:15:45 +08:00
ChenYi
53e6bb252a 修复15分钟任务Kafka主题异常的问题,新增增量源码工厂类 2025-05-06 23:46:12 +08:00
31 changed files with 961 additions and 432 deletions

View File

@ -1,6 +1,9 @@
using Microsoft.CodeAnalysis; using JiShe.CollectBus.Analyzers.Shared;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.CSharp.Syntax; using Microsoft.CodeAnalysis.CSharp.Syntax;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -8,7 +11,7 @@ using System.Text;
namespace JiShe.CollectBus.IncrementalGenerator namespace JiShe.CollectBus.IncrementalGenerator
{ {
/// <summary> /// <summary>
/// 复杂类型源生成器 /// 复杂类型增量源生成器
/// </summary> /// </summary>
[Generator(LanguageNames.CSharp)] [Generator(LanguageNames.CSharp)]
public class ComplexTypeSourceAnalyzers : IIncrementalGenerator public class ComplexTypeSourceAnalyzers : IIncrementalGenerator
@ -19,12 +22,7 @@ namespace JiShe.CollectBus.IncrementalGenerator
{ {
//Debugger.Launch(); //Debugger.Launch();
context.RegisterPostInitializationOutput(ctx => // 步骤1筛选带有 [SourceAnalyzers] 的类
{
ctx.AddSource("GeneratorInit.g.cs", "// Initialization Marker");
});
// 步骤1筛选带有 [GenerateAccessors] 的类
var classDeclarations = context.SyntaxProvider var classDeclarations = context.SyntaxProvider
.CreateSyntaxProvider( .CreateSyntaxProvider(
predicate: static (s, _) => IsClassWithAttribute(s), predicate: static (s, _) => IsClassWithAttribute(s),
@ -40,22 +38,40 @@ namespace JiShe.CollectBus.IncrementalGenerator
private static bool IsClassWithAttribute(SyntaxNode node) => node is ClassDeclarationSyntax cds && cds.AttributeLists.Count > 0; private static bool IsClassWithAttribute(SyntaxNode node) => node is ClassDeclarationSyntax cds && cds.AttributeLists.Count > 0;
private static ClassDeclarationSyntax GetClassDeclaration(GeneratorSyntaxContext context) private static ClassDeclarationSyntax GetClassDeclaration(GeneratorSyntaxContext context)
{ {
var classDecl = (ClassDeclarationSyntax)context.Node; var classDecl = (ClassDeclarationSyntax)context.Node;
var attributeType = context.SemanticModel.Compilation.GetTypeByMetadataName(AttributeFullName); var semanticModel = context.SemanticModel;
foreach (var attribute in classDecl.AttributeLists.SelectMany(al => al.Attributes)) // 获取类符号
var classSymbol = semanticModel.GetDeclaredSymbol(classDecl) as INamedTypeSymbol;
if (classSymbol == null) return null;
// 检查是否包含 SourceAnalyzers 特性
var sourceAnalyzerAttr = classSymbol.GetAttributes().FirstOrDefault(attr => attr.AttributeClass?.ToDisplayString() == AttributeFullName);
// 必须包含 EntityType 参数
if (sourceAnalyzerAttr == null ||
sourceAnalyzerAttr.ConstructorArguments.Length == 0)
{ {
var symbol = context.SemanticModel.GetSymbolInfo(attribute).Symbol; return null;
if (symbol is IMethodSymbol ctor &&
SymbolEqualityComparer.Default.Equals(ctor.ContainingType, attributeType))
{
return classDecl;
}
} }
return null;
return classDecl;
//var classDecl = (ClassDeclarationSyntax)context.Node;
//var attributeType = context.SemanticModel.Compilation.GetTypeByMetadataName(AttributeFullName);
//foreach (var attribute in classDecl.AttributeLists.SelectMany(al => al.Attributes))
//{
// var symbol = context.SemanticModel.GetSymbolInfo(attribute).Symbol;
// if (symbol is IMethodSymbol ctor &&
// SymbolEqualityComparer.Default.Equals(ctor.ContainingType, attributeType))
// {
// return classDecl;
// }
//}
//return null;
} }
/// <summary> /// <summary>
@ -76,6 +92,12 @@ namespace JiShe.CollectBus.IncrementalGenerator
} }
} }
/// <summary>
/// 生成代码
/// </summary>
/// <param name="compilation"></param>
/// <param name="classes"></param>
/// <param name="context"></param>
private static void GenerateCode( private static void GenerateCode(
Compilation compilation, Compilation compilation,
IEnumerable<ClassDeclarationSyntax> classes, IEnumerable<ClassDeclarationSyntax> classes,
@ -86,8 +108,8 @@ namespace JiShe.CollectBus.IncrementalGenerator
if (!classes.Any()) if (!classes.Any())
{ {
context.ReportDiagnostic(Diagnostic.Create( context.ReportDiagnostic(Diagnostic.Create(
new DiagnosticDescriptor("GEN002", "No Targets", new DiagnosticDescriptor("GEN002", "没有目标类",
"No classes with [GenerateAccessors] found", "Debug", DiagnosticSeverity.Warning, true), "没有找到SourceAnalyzers标记的类", "Debug", DiagnosticSeverity.Warning, true),
Location.None)); Location.None));
} }
@ -99,77 +121,25 @@ namespace JiShe.CollectBus.IncrementalGenerator
if (classSymbol == null || !processedTypes.Add(classSymbol)) if (classSymbol == null || !processedTypes.Add(classSymbol))
{ {
context.ReportDiagnostic(Diagnostic.Create( context.ReportDiagnostic(Diagnostic.Create(
new DiagnosticDescriptor("GEN003", "Invalid Symbol", new DiagnosticDescriptor("GEN003", "无效符号",
$"Class symbol is null for {classDecl.Identifier.Text}", "Error", DiagnosticSeverity.Error, true), $"类名称为{classDecl.Identifier.Text} 符号为空", "Error", DiagnosticSeverity.Error, true),
Location.None)); Location.None));
continue; continue;
} }
context.ReportDiagnostic(Diagnostic.Create( var code3 = BuildAccessorsForSourceEntity(classSymbol, compilation, processedTypes);
new DiagnosticDescriptor( context.AddSource($"{classSymbol.Name}Accessor.g.cs", code3);
"PA001",
"Generated Accessors",
$"Generating accessors for {classSymbol.Name}",
"Performance",
DiagnosticSeverity.Info,
true),
Location.None));
// 新增:输出继承链信息
context.ReportDiagnostic(Diagnostic.Create(
new DiagnosticDescriptor("HIERARCHY", "Class Hierarchy",
$"Processing class: {classSymbol.Name}, BaseType: {classSymbol.BaseType?.Name}",
"Debug", DiagnosticSeverity.Info, true),
Location.None));
context.ReportDiagnostic(Diagnostic.Create(
new DiagnosticDescriptor("PA002", "Class Found",
$"Processing class: {classSymbol.Name}", "Debug", DiagnosticSeverity.Warning, true),
Location.None));
var code = BuildAccessorsForType(classSymbol, compilation, processedTypes);
System.Diagnostics.Debug.WriteLine($"Generated code for {classSymbol.Name}:\n{code}"); // 调试输出
context.AddSource($"{classSymbol.Name}Extension.g.cs", code);
}
}
private static string BuildAccessorsForType(
INamedTypeSymbol classSymbol,
Compilation compilation,
HashSet<ITypeSymbol> processedTypes)
{
var code = new StringBuilder();
code.AppendLine("#pragma warning disable CS0419 // 禁用警告");
code.AppendLine("// Generated code for " + classSymbol.Name);
code.AppendLine("// <auto-generated/>");
code.AppendLine("#nullable enable");
code.AppendLine($"namespace {classSymbol.ContainingNamespace.ToDisplayString()};");
code.AppendLine();
code.AppendLine($"public static class {classSymbol.Name}Extension{GetGenericParams(classSymbol)}");
code.AppendLine("{");
//foreach (var prop in classSymbol.GetMembers().OfType<IPropertySymbol>())
//{
// if (prop.IsIndexer) continue;
// GeneratePropertyAccessors(prop, code, compilation, processedTypes);
//}
foreach (var prop in GetAllPropertiesInHierarchy(classSymbol))
{
if (prop.IsIndexer) continue;
GeneratePropertyAccessors(prop, code, compilation, processedTypes);
} }
code.AppendLine("}"); // 生成工厂注册代码
return code.ToString(); context.AddSource("SourceEntityAccessorFactory.g.cs", BuildFactoryCode());
} }
//private static string GetGenericParams(INamedTypeSymbol symbol) /// <summary>
// => symbol.IsGenericType ? $"<{string.Join(", ", symbol.TypeParameters.Select(t => t.Name))}>" : ""; /// 获取泛型参数
/// </summary>
/// <param name="symbol"></param>
/// <returns></returns>
public static string GetGenericParams(INamedTypeSymbol symbol) public static string GetGenericParams(INamedTypeSymbol symbol)
{ {
if (!symbol.IsGenericType) return ""; if (!symbol.IsGenericType) return "";
@ -177,51 +147,13 @@ namespace JiShe.CollectBus.IncrementalGenerator
return $"<{string.Join(", ", parameters)}>"; return $"<{string.Join(", ", parameters)}>";
} }
private static void GeneratePropertyAccessors(
IPropertySymbol prop,
StringBuilder code,
Compilation compilation,
HashSet<ITypeSymbol> processedTypes)
{
// 关键修复点1安全类型转换
if (prop.Type is not ITypeSymbol propType) return;
code.AppendLine($" // Processing property: {prop.Name}");
// 处理元组类型
if (propType is INamedTypeSymbol { IsTupleType: true } tupleType)
{
GenerateTupleAccessors(prop, tupleType, code);
}
else if (propType is INamedTypeSymbol namedType)
{
GenerateStandardAccessors(prop, namedType, code);
ProcessNestedType(namedType, compilation, processedTypes);
}
}
private static void GenerateTupleAccessors(IPropertySymbol prop, INamedTypeSymbol tupleType, StringBuilder code)
{
var elements = tupleType.TupleElements;
var parentType = prop.ContainingType.ToDisplayString();
for (int i = 0; i < elements.Length; i++)
{
var element = elements[i];
if (element.Type is not ITypeSymbol elementType) continue;
var elementName = element.CorrespondingTupleField?.Name ?? $"Item{i + 1}";
code.AppendLine($" public static {elementType.ToDisplayString()} Get{prop.Name}_{elementName}({parentType} obj) => obj.{prop.Name}.{elementName};");
if (prop.SetMethod != null)
{
var assignments = elements.Select((e, idx) =>
idx == i ? "value" : $"obj.{prop.Name}.{e.Name}");
code.AppendLine($" public static void Set{prop.Name}_{elementName}({parentType} obj, {elementType.ToDisplayString()} value) => obj.{prop.Name} = ({string.Join(", ", assignments)});");
}
}
}
/// <summary>
/// 生成标准属性的访问器
/// </summary>
/// <param name="prop"></param>
/// <param name="propType"></param>
/// <param name="code"></param>
private static void GenerateStandardAccessors(IPropertySymbol prop, INamedTypeSymbol propType, StringBuilder code) private static void GenerateStandardAccessors(IPropertySymbol prop, INamedTypeSymbol propType, StringBuilder code)
{ {
var parentType = prop.ContainingType.ToDisplayString(); var parentType = prop.ContainingType.ToDisplayString();
@ -233,22 +165,525 @@ namespace JiShe.CollectBus.IncrementalGenerator
} }
} }
private static void ProcessNestedType(ITypeSymbol typeSymbol, Compilation compilation, HashSet<ITypeSymbol> processedTypes)
{
if (typeSymbol is not INamedTypeSymbol namedType) return;
if (!ShouldProcessNestedType(namedType)) return;
if (!processedTypes.Add(namedType)) return;
var code = BuildAccessorsForType(namedType, compilation, processedTypes); /// <summary>
/// 构建实体访问器代码(支持泛型)
/// </summary>
private static string BuildAccessorsForSourceEntity(
INamedTypeSymbol classSymbol,
Compilation compilation,
HashSet<ITypeSymbol> processedTypes)
{
// 获取 SourceAnalyzers 特性的 EntityType 参数
var sourceAnalyzerAttr = classSymbol.GetAttributes()
.FirstOrDefault(attr =>
attr.AttributeClass?.ToDisplayString() == AttributeFullName);
// 解析 EntityType 枚举值
string entityTypeValue = "EntityTypeEnum.Other"; // 默认值
if (sourceAnalyzerAttr != null &&
sourceAnalyzerAttr.ConstructorArguments.Length > 0)
{
var arg = sourceAnalyzerAttr.ConstructorArguments[0];
if (arg.Kind == TypedConstantKind.Enum &&
arg.Type is INamedTypeSymbol enumType)
{
int enumValue = (int)arg.Value!;
entityTypeValue = GetEnumMemberName(enumType, enumValue);
}
}
var code = new StringBuilder();
code.AppendLine("// <auto-generated/>");
code.AppendLine("#nullable enable");
code.AppendLine("using System;");
code.AppendLine("using System.Reflection;");
code.AppendLine("using System.Collections.Generic;");
code.AppendLine("using JiShe.CollectBus.Analyzers.Shared;");
code.AppendLine($"namespace {classSymbol.ContainingNamespace.ToDisplayString()};");
code.AppendLine();
// 处理泛型类型名称
var accessibility = classSymbol.DeclaredAccessibility switch
{
Accessibility.Public => "public",
_ => "internal"
};
var genericParams = classSymbol.IsGenericType
? $"<{string.Join(", ", classSymbol.TypeParameters.Select(t => t.Name))}>"
: "";
code.AppendLine(
$"{accessibility} sealed class {classSymbol.Name}Accessor{genericParams} " + // 保留泛型参数
$": ISourceEntityAccessor<{classSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)}>");
code.AppendLine("{");
var propList = GetAllPropertiesInHierarchy(classSymbol);
//类名称
code.AppendLine($" public string EntityName {{get;}} = \"{classSymbol.Name}\";");
// 添加 EntityType 属性
code.AppendLine($" public EntityTypeEnum? EntityType {{ get; }} = {entityTypeValue};");
foreach (var prop in propList)
{
// 安全类型转换
if (prop.Type is not ITypeSymbol propType) continue;
if (propType is INamedTypeSymbol namedType)
{
GenerateStandardAccessors(prop, namedType, code);
}
if (propType is INamedTypeSymbol { IsTupleType: true } tupleType)
{
GenerateTupleAccessors(prop, tupleType, code);
}
}
//生成当前类属性名称集合
GeneratePropertyListForSourceEntity(propList, code, compilation, classSymbol);
//生成当前类属性信息集合
GenerateEntityMemberInfoList(propList, code, compilation, classSymbol);
//生成当前类属性访问
GetGeneratePropertyValueForSourceEntity(propList, code, compilation, classSymbol);
//生成当前类属性设置
SetGeneratePropertyValueForSourceEntity(propList, code, compilation, classSymbol);
code.AppendLine("}");
return code.ToString();
} }
private static bool ShouldProcessNestedType(INamedTypeSymbol symbol) /// <summary>
/// 生成ValueTuple元组属性访问器
/// </summary>
/// <param name="prop"></param>
/// <param name="tupleType"></param>
/// <param name="code"></param>
private static void GenerateTupleAccessors(
IPropertySymbol prop,
INamedTypeSymbol tupleType,
StringBuilder code)
{ {
return symbol.DeclaredAccessibility == Accessibility.Public && var parentType = prop.ContainingType.ToDisplayString();
!symbol.IsTupleType && var tupleElements = tupleType.TupleElements;
!symbol.IsAnonymousType &&
!symbol.IsImplicitlyDeclared && for (int i = 0; i < tupleElements.Length; i++)
!symbol.Name.StartsWith("<"); {
var element = tupleElements[i];
var elementType = element.Type.ToDisplayString();
var elementName = element.Name;
// Getter
code.AppendLine($"public static {elementType} Get{prop.Name}_{elementName}({parentType} obj) => obj.{prop.Name}.{elementName};");
// Setter
if (prop.SetMethod != null)
{
code.AppendLine($"public static void Set{prop.Name}_{elementName}({parentType} obj, {elementType} value) => obj.{prop.Name} = ({string.Join(", ", GetTupleElements(prop.Name, tupleElements, i))});");
}
}
}
private static IEnumerable<string> GetTupleElements(
string propName,
ImmutableArray<IFieldSymbol> elements,
int targetIndex)
{
for (int i = 0; i < elements.Length; i++)
{
yield return i == targetIndex
? "value"
: $"obj.{propName}.{elements[i].Name}";
}
}
/// <summary>
/// 处理System.Tuple类型的访问器生成
/// </summary>
private static void GenerateSystemTupleAccessors(
IPropertySymbol prop,
INamedTypeSymbol tupleType,
StringBuilder code,
INamedTypeSymbol classSymbol)
{
var parentType = classSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
var elementTypes = tupleType.TypeArguments;
var tupleTypeName = tupleType.ToDisplayString();
for (int i = 0; i < elementTypes.Length; i++)
{
var elementType = elementTypes[i].ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
var elementName = $"Item{i + 1}";
// Getter
code.AppendLine(
$" public static {elementType} Get{prop.Name}_{elementName}" +
$"({parentType} obj) => obj.{prop.Name}.{elementName};");
// Setter
if (prop.SetMethod != null)
{
var assignments = elementTypes.Select((_, idx) =>
idx == i ? "value" : $"obj.{prop.Name}.Item{idx + 1}"
).ToList();
code.AppendLine(
$" public static void Set{prop.Name}_{elementName}" +
$"({parentType} obj, {elementType} value) => " +
$"obj.{prop.Name} = new {tupleTypeName}({string.Join(", ", assignments)});");
}
}
}
/// <summary>
/// 增强的工厂类实现
/// </summary>
private static string BuildFactoryCode()
{
return """
using System;
using System.Collections.Concurrent;
using System.Reflection;
namespace JiShe.CollectBus.Analyzers.Shared;
public static class SourceEntityAccessorFactory
{
private static readonly ConcurrentDictionary<Type, object> _accessors = new();
public static ISourceEntityAccessor<T> GetAccessor<T>()
{
return (ISourceEntityAccessor<T>)_accessors.GetOrAdd(typeof(T), t =>
{
// 获取泛型类型定义信息(如果是泛型类型)
var isGeneric = t.IsGenericType;
var genericTypeDef = isGeneric ? t.GetGenericTypeDefinition() : null;
var arity = isGeneric ? genericTypeDef!.GetGenericArguments().Length : 0;
// 构建访问器类名
var typeName = isGeneric
? $"{t.Namespace}.{genericTypeDef!.Name.Split('`')[0]}Accessor`{arity}"
: $"{t.Namespace}.{t.Name}Accessor";
// 尝试从当前程序集加载
var accessorType = Assembly.GetAssembly(t)!.GetType(typeName)
?? throw new InvalidOperationException($"Accessor type {typeName} not found");
// 处理泛型参数
if (isGeneric && accessorType.IsGenericTypeDefinition)
{
accessorType = accessorType.MakeGenericType(t.GetGenericArguments());
}
return Activator.CreateInstance(accessorType)!;
});
}
}
""";
}
/// <summary>
/// 属性访问生成逻辑
/// </summary>
/// <param name="propList">属性集合</param>
/// <param name="code"></param>
/// <param name="compilation"></param>
/// <param name="classSymbol"></param>
private static void GetGeneratePropertyValueForSourceEntity(
IEnumerable<IPropertySymbol> propList,
StringBuilder code,
Compilation compilation,
INamedTypeSymbol classSymbol)
{
code.AppendLine($" public object GetPropertyValue({classSymbol} targetEntity, string propertyName)");
code.AppendLine(" {");
code.AppendLine(" return propertyName switch");
code.AppendLine(" {");
foreach (var prop in propList)
{
code.AppendLine(
$" \"{prop.Name}\" => " +
$"Get{prop.Name}(targetEntity),");
if (prop.Type is INamedTypeSymbol { IsTupleType: true } tupleType)
{
foreach (var element in tupleType.TupleElements)
{
code.AppendLine(
$" \"{prop.Name}.{element.Name}\" => " +
$"Get{prop.Name}_{element.Name}(targetEntity),");
}
}
}
code.AppendLine(" _ => throw new ArgumentException($\"Unknown property: {propertyName}\")");
code.AppendLine(" };");
code.AppendLine(" }");
}
/// <summary>
/// 属性设置生成逻辑
/// </summary>
/// <param name="propList">属性集合</param>
/// <param name="code"></param>
/// <param name="compilation"></param>
/// <param name="classSymbol"></param>
private static void SetGeneratePropertyValueForSourceEntity(
IEnumerable<IPropertySymbol> propList,
StringBuilder code,
Compilation compilation,
INamedTypeSymbol classSymbol)
{
code.AppendLine($" public void SetPropertyValue({classSymbol} targetEntity, string propertyName, object value)");
code.AppendLine(" {");
code.AppendLine(" switch (propertyName)");
code.AppendLine(" {");
foreach (var prop in propList)
{
var propType = prop.Type.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
code.AppendLine($" case \"{prop.Name}\":");
code.AppendLine($" Set{prop.Name}(");
code.AppendLine($" targetEntity, ({propType})value);");
code.AppendLine(" break;");
if (prop.Type is INamedTypeSymbol { IsTupleType: true } tupleType)
{
foreach (var element in tupleType.TupleElements)
{
var elementType = element.Type.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
code.AppendLine($" case \"{prop.Name}.{element.Name}\":");
code.AppendLine($" Set{prop.Name}_{element.Name}(");
code.AppendLine($" targetEntity, ({elementType})value);");
code.AppendLine(" break;");
}
}
}
code.AppendLine(" default:");
code.AppendLine(" throw new ArgumentException($\"Unknown property: {propertyName}\");");
code.AppendLine(" }");
code.AppendLine(" }");
}
/// <summary>
/// 属性名称集合
/// </summary>
/// <param name="propList">属性集合</param>
/// <param name="code"></param>
/// <param name="compilation"></param>
/// <param name="classSymbol"></param>
private static void GeneratePropertyListForSourceEntity(
IEnumerable<IPropertySymbol> propList,
StringBuilder code,
Compilation compilation,
INamedTypeSymbol classSymbol)
{
code.AppendLine(" public List<string> PropertyNameList {get;} = new List<string>()");
code.AppendLine(" {");
List<string> tempPropList = new List<string>();
foreach (var prop in propList)
{
if (prop.Type is INamedTypeSymbol { IsTupleType: true } tupleType)
{
foreach (var element in tupleType.TupleElements)
{
tempPropList.Add($"\"{prop.Name}.{element.Name}\"");
}
}
else
{
tempPropList.Add($"\"{prop.Name}\"");
}
}
code.Append(string.Join(",", tempPropList));
code.AppendLine(" };");
}
/// <summary>
/// 生成当前类属性信息集合
/// </summary>
private static void GenerateEntityMemberInfoList(
IEnumerable<IPropertySymbol> propList,
StringBuilder code,
Compilation compilation,
INamedTypeSymbol classSymbol)
{
code.AppendLine(" public List<EntityMemberInfo> MemberList { get; } = new()");
code.AppendLine(" {");
var initializerLines = new List<string>();
foreach (var prop in propList)
{
var entityType = prop.ContainingType.ToDisplayString();//entity 实体类型名称
var propType = prop.Type;//实体属性的类型
var propTypeName = propType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
var declaredTypeName = propType.Name; // 直接获取类型名称(如 "Int32"
// 处理主属性
var propAttributes = prop.GetAttributes()
.Where(a => !IsCompilerGeneratedAttribute(a))
.ToList();
var attributeInitializers = propAttributes
.Select(GenerateAttributeInitializer)
.Where(s => !string.IsNullOrEmpty(s));
var mainMember = new StringBuilder();
mainMember.Append(
$"new EntityMemberInfo(" +
$"\"{prop.Name}\", " +
$"typeof({propTypeName}), " +
$"\"{declaredTypeName}\", " +
$"(e) => Get{prop.Name}(({entityType})e), " +
$"(e, v) => Set{prop.Name}(({entityType})e, ({propTypeName})v))");
if (attributeInitializers.Any())
{
mainMember.AppendLine();
mainMember.Append(" { CustomAttributes = new List<Attribute>");
mainMember.Append($" {{ {string.Join(", ", attributeInitializers)} }} }}");
}
initializerLines.Add(mainMember.ToString());
// 处理元组元素,(暂不需要处理元组元素的特性)
if (prop.Type is INamedTypeSymbol { IsTupleType: true } tupleType)
{
foreach (var element in tupleType.TupleElements)
{
var elementType = element.Type.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);//元组元素的类型
var elementName = element.Name;//元组元素名称
var elementDeclaredName = element.Type.Name;//元组元素类型名称
initializerLines.Add(
$"new EntityMemberInfo(" +
$"\"{prop.Name}.{elementName}\", " +
$"typeof({elementType}), " +
$"\"{elementDeclaredName}\", " +
$"(e) => Get{prop.Name}_{elementName}(({entityType})e), " +
$"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))");
}
}
}
code.AppendLine(string.Join(",\n", initializerLines));
code.AppendLine(" };");
}
private static string GenerateAttributeInitializer(AttributeData attribute)
{
if (attribute.AttributeClass == null)
return string.Empty;
var attributeClass = attribute.AttributeClass.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
var args = attribute.ConstructorArguments;
var namedArgs = attribute.NamedArguments;
var parameters = new List<string>();
foreach (var arg in args)
{
parameters.Add(ConvertTypedConstantToCode(arg));
}
var constructorArgs = string.Join(", ", parameters);
var initializer = new StringBuilder();
initializer.Append($"new {attributeClass}({constructorArgs})");
if (namedArgs.Any())
{
initializer.Append(" { ");
var namedArgsList = namedArgs.Select(n => $"{n.Key} = {ConvertTypedConstantToCode(n.Value)}");
initializer.Append(string.Join(", ", namedArgsList));
initializer.Append(" }");
}
return initializer.ToString();
}
private static string ConvertTypedConstantToCode(TypedConstant constant)
{
if (constant.IsNull)
return "null";
switch (constant.Kind)
{
case TypedConstantKind.Array:
var elements = constant.Values.Select(ConvertTypedConstantToCode);
return $"new[] {{ {string.Join(", ", elements)} }}";
case TypedConstantKind.Type:
var typeSymbol = (ITypeSymbol)constant.Value!;
return $"typeof({typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)})";
case TypedConstantKind.Enum:
return ConvertEnumTypedConstant(constant);
default:
return ConvertPrimitiveConstant(constant);
}
}
private static string ConvertEnumTypedConstant(TypedConstant constant)
{
var enumType = constant.Type!;
var enumValue = constant.Value!;
var enumTypeName = enumType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
foreach (var member in enumType.GetMembers().OfType<IFieldSymbol>())
{
if (member.ConstantValue != null && member.ConstantValue.Equals(enumValue))
return $"{enumTypeName}.{member.Name}";
}
return $"({enumTypeName})({enumValue})";
}
private static string ConvertPrimitiveConstant(TypedConstant constant)
{
var value = constant.Value!;
return value switch
{
string s => $"\"{s}\"",
char c => $"'{c}'",
bool b => b ? "true" : "false",
_ => value.ToString()
};
}
private static bool IsCompilerGeneratedAttribute(AttributeData attribute)
{
return attribute.AttributeClass?.ToDisplayString() == "System.Runtime.CompilerServices.CompilerGeneratedAttribute";
}
/// <summary>
/// 获取枚举的参数
/// </summary>
/// <param name="enumType"></param>
/// <param name="value"></param>
/// <returns></returns>
private static string GetEnumMemberName(INamedTypeSymbol enumType, int value)
{
foreach (var member in enumType.GetMembers().OfType<IFieldSymbol>())
{
if (member.ConstantValue is int intValue && intValue == value)
{
return $"{enumType.ToDisplayString()}.{member.Name}";
}
}
return $"{enumType.ToDisplayString()}.Other";
} }
} }
} }

View File

@ -1,19 +0,0 @@
using JiShe.CollectBus.IoTDB.Enums;
namespace JiShe.CollectBus.IoTDB.Attribute
{
/// <summary>
/// IoTDB实体类型特性
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class EntityTypeAttribute : System.Attribute
{
public EntityTypeEnum EntityType { get; }
public EntityTypeAttribute(EntityTypeEnum entityType)
{
EntityType = entityType;
}
}
}

View File

@ -1,4 +1,4 @@
namespace JiShe.CollectBus.IoTDB.Attribute namespace JiShe.CollectBus.IoTDB.Attributes
{ {
/// <summary> /// <summary>
/// Column分类标记特性ATTRIBUTE字段,也就是属性字段 /// Column分类标记特性ATTRIBUTE字段,也就是属性字段

View File

@ -1,4 +1,4 @@
namespace JiShe.CollectBus.IoTDB.Attribute namespace JiShe.CollectBus.IoTDB.Attributes
{ {
/// <summary> /// <summary>
/// Column分类标记特性FIELD字段数据列字段 /// Column分类标记特性FIELD字段数据列字段

View File

@ -1,4 +1,4 @@
namespace JiShe.CollectBus.IoTDB.Attribute namespace JiShe.CollectBus.IoTDB.Attributes
{ {
/// <summary> /// <summary>
/// 用于标识当前实体为单侧点模式单侧点模式只有一个Filed标识字段,类型是Tuple<string,T>,Item1=>测点名称Item2=>测点值,泛型 /// 用于标识当前实体为单侧点模式单侧点模式只有一个Filed标识字段,类型是Tuple<string,T>,Item1=>测点名称Item2=>测点值,泛型

View File

@ -1,4 +1,4 @@
namespace JiShe.CollectBus.IoTDB.Attribute namespace JiShe.CollectBus.IoTDB.Attributes
{ {
/// <summary> /// <summary>
/// Column分类标记特性TAG字段标签字段 /// Column分类标记特性TAG字段标签字段

View File

@ -1,6 +1,5 @@
using JiShe.CollectBus.IoTDB.Enums; 
namespace JiShe.CollectBus.IoTDB.Attributes
namespace JiShe.CollectBus.IoTDB.Attribute
{ {
/// <summary> /// <summary>
/// IoTDB实体存储路径或表名称一般用于已经明确的存储路径或表名称例如日志存储 /// IoTDB实体存储路径或表名称一般用于已经明确的存储路径或表名称例如日志存储

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDB.Model
{
internal class Class1
{
}
}

View File

@ -1,5 +1,6 @@
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.IoTDB.Attributes;
namespace JiShe.CollectBus.IoTDB.Model namespace JiShe.CollectBus.IoTDB.Model
{ {
@ -20,6 +21,12 @@ namespace JiShe.CollectBus.IoTDB.Model
[TAGColumn] [TAGColumn]
public string ProjectId { get; set; } public string ProjectId { get; set; }
/// <summary>
/// 数据类型
/// </summary>
[TAGColumn]
public string DataType { get; set; } = IOTDBDataTypeConst.Data;
/// <summary> /// <summary>
/// 设备类型集中器、电表、水表、流量计、传感器等 /// 设备类型集中器、电表、水表、流量计、传感器等
/// </summary> /// </summary>

View File

@ -1,20 +1,18 @@
using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Enums;
namespace JiShe.CollectBus.IoTDB.Model namespace JiShe.CollectBus.IoTDB.Model
{ {
/// <summary> /// <summary>
/// Table模型单项数据实体 /// Table模型单项数据实体
/// </summary> /// </summary>
[EntityType(EntityTypeEnum.TableModel)] [SourceAnalyzers(EntityTypeEnum.TableModel)]
[SourceAnalyzers]
public class TableModelSingleMeasuringEntity<T> : IoTEntity public class TableModelSingleMeasuringEntity<T> : IoTEntity
{ {
/// <summary> /// <summary>
/// 单项数据键值对 /// 单项数据键值对
/// </summary> /// </summary>
[SingleMeasuring(nameof(SingleColumn))] [SingleMeasuring(nameof(SingleColumn))]
public required Tuple<string, T> SingleColumn { get; set; } public required ValueTuple<string, T> SingleColumn { get; set; }
} }
} }

View File

@ -1,20 +1,18 @@
using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Enums;
namespace JiShe.CollectBus.IoTDB.Model namespace JiShe.CollectBus.IoTDB.Model
{ {
/// <summary> /// <summary>
/// Tree模型单项数据实体 /// Tree模型单项数据实体
/// </summary> /// </summary>
[EntityType(EntityTypeEnum.TreeModel)] [SourceAnalyzers(EntityTypeEnum.TreeModel)]
[SourceAnalyzers]
public class TreeModelSingleMeasuringEntity<T> : IoTEntity public class TreeModelSingleMeasuringEntity<T> : IoTEntity
{ {
/// <summary> /// <summary>
/// 单项数据键值对 /// 单项数据键值对
/// </summary> /// </summary>
[SingleMeasuring(nameof(SingleMeasuring))] [SingleMeasuring(nameof(SingleMeasuring))]
public required Tuple<string, T> SingleMeasuring { get; set; } public required ValueTuple<string, T> SingleMeasuring { get; set; }
} }
} }

View File

@ -1,5 +1,5 @@
using Apache.IoTDB; using Apache.IoTDB;
using JiShe.CollectBus.IoTDB.Enums; using JiShe.CollectBus.Analyzers.Shared;
namespace JiShe.CollectBus.IoTDB.Provider namespace JiShe.CollectBus.IoTDB.Provider
{ {
@ -9,9 +9,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
public class DeviceMetadata public class DeviceMetadata
{ {
/// <summary> /// <summary>
/// IoTDB实体类型枚举 /// 实体类型枚举
/// </summary> /// </summary>
public EntityTypeEnum EntityType { get; set; } public EntityTypeEnum? EntityType { get; set; }
/// <summary> /// <summary>
/// 是否有单测量值 /// 是否有单测量值

View File

@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public static string GetDevicePath<T>(T entity) where T : IoTEntity public static string GetDevicePath<T>(T entity) where T : IoTEntity
{ {
return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.{entity.DataType}.`{entity.DeviceId}`";
} }

View File

@ -12,7 +12,7 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
@ -20,6 +20,7 @@ using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
using JiShe.CollectBus.Analyzers.Shared;
namespace JiShe.CollectBus.IoTDB.Provider namespace JiShe.CollectBus.IoTDB.Provider
{ {
@ -176,14 +177,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task<DeviceMetadata> GetMetadata<T>() where T : IoTEntity public async Task<DeviceMetadata> GetMetadata<T>() where T : IoTEntity
{ {
var columns = CollectColumnMetadata(typeof(T)); var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var columns = CollectColumnMetadata<T>(accessor);
var metadata = BuildDeviceMetadata<T>(columns); var metadata = BuildDeviceMetadata<T>(columns);
var metaData = MetadataCache.AddOrUpdate( var metaData = MetadataCache.AddOrUpdate(
typeof(T), typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加 addValueFactory: t => metadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) => updateValueFactory: (t, existingValue) =>
{ {
var columns = CollectColumnMetadata(t); var columns = CollectColumnMetadata(accessor);
var metadata = BuildDeviceMetadata<T>(columns); var metadata = BuildDeviceMetadata<T>(columns);
//对现有值 existingValue 进行修改,返回新值 //对现有值 existingValue 进行修改,返回新值
@ -192,6 +195,8 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
); );
metadata.EntityType = accessor.EntityType;
return await Task.FromResult(metaData); return await Task.FromResult(metaData);
} }
@ -260,23 +265,30 @@ namespace JiShe.CollectBus.IoTDB.Provider
List<string> tempColumnNames = new List<string>(); List<string> tempColumnNames = new List<string>();
tempColumnNames.AddRange(metadata.ColumnNames); tempColumnNames.AddRange(metadata.ColumnNames);
var entityTypeAttribute = typeof(T).GetCustomAttribute<EntityTypeAttribute>(); var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
if (entityTypeAttribute == null) var memberCache = new Dictionary<string, EntityMemberInfo>(); // 缓存优化查询
// 预构建成员缓存Key: NameOrPath
foreach (var member in accessor.MemberList)
{
memberCache[member.NameOrPath] = member;
}
if (accessor.EntityType == null || metadata.EntityType == null)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101");
} }
if (metadata.EntityType != entityTypeAttribute.EntityType) if (metadata.EntityType != accessor.EntityType)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102");
} }
if (metadata.EntityType == Enums.EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接属于异常情况-103"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接属于异常情况-103");
} }
else if (metadata.EntityType == Enums.EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104");
} }
@ -295,66 +307,41 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var measurement in tempColumnNames) foreach (var measurement in tempColumnNames)
{ {
if (!memberCache.TryGetValue(measurement, out var member))
PropertyInfo propertyInfo = typeof(T).GetProperty(measurement);
if (propertyInfo == null)
{ {
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息-105");
} }
var value = propertyInfo.GetValue(entity); var value = member.GetValue(entity);
if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && metadata.IsSingleMeasuring == true)//表示当前对象是单测点模式
// 特性查询优化
var attributes = member.CustomAttributes ?? Enumerable.Empty<Attribute>();
var singleMeasuringAttr = attributes.OfType<SingleMeasuringAttribute>().FirstOrDefault();
if (singleMeasuringAttr != null)//如果是单侧点
{ {
if (value != null) var tupleItemKey = $"{member.NameOrPath}.Item2";
if (!memberCache.TryGetValue(tupleItemKey, out var tupleMember))
{ {
Type tupleType = value.GetType(); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-106");
Type[] tupleArgs = tupleType.GetGenericArguments();
Type item2Type = tupleArgs[1]; // T 的实际类型
var item1 = tupleType.GetProperty("Item1")!.GetValue(value);
var item2 = tupleType.GetProperty("Item2")!.GetValue(value);
if (item1 == null || item2 == null)
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。");
}
var indexOf = metadata.ColumnNames.IndexOf(measurement);
metadata.ColumnNames[indexOf] = (string)item1!;
rowValues.Add(item2);
}
else
{
rowValues.Add(null);
} }
//同时如果是单测点模式且是table模型存储路径只能通过DevicePathBuilder.GetDeviceTableName(entity)获取 value = tupleMember.GetValue(entity);
if (_runtimeContext.UseTableSessionPool) }
if (value != null)
{
var tempValue = member.DeclaredTypeName.ToUpper() switch
{ {
tableNameOrTreePath = DevicePathBuilder.GetDeviceTableName(entity); "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
} _ => value
};
rowValues.Add(tempValue);
} }
else else
{ {
rowValues.Add(value);
//需要根据value的类型进行相应的值映射转换例如datetime转换为long的时间戳值
if (value != null)
{
Type tupleType = value.GetType();
var tempValue = tupleType.Name.ToUpper() switch
{
"DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value
};
rowValues.Add(tempValue);
}
else
{
rowValues.Add(value);
}
} }
} }
values.Add(rowValues); values.Add(rowValues);
@ -547,14 +534,20 @@ namespace JiShe.CollectBus.IoTDB.Provider
var results = new List<T>(); var results = new List<T>();
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var properties = typeof(T).GetProperties(); var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var memberCache = new Dictionary<string, EntityMemberInfo>(); // 缓存优化查询
// 预构建成员缓存Key: NameOrPath
foreach (var member in accessor.MemberList)
{
memberCache[member.NameOrPath] = member;
}
var columns = new List<string>() { "Timestamps" }; var columns = new List<string>() { "Timestamps" };
var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP }; var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP };
columns.AddRange(metadata.ColumnNames); columns.AddRange(metadata.ColumnNames);
dataTypes.AddRange(metadata.DataTypes); dataTypes.AddRange(metadata.DataTypes);
//metadata.ColumnNames.Insert(0, "Timestamps");
//metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP);
while (dataSet.HasNext() && results.Count < pageSize) while (dataSet.HasNext() && results.Count < pageSize)
{ {
@ -570,20 +563,20 @@ namespace JiShe.CollectBus.IoTDB.Provider
var value = record.Values[indexOf]; var value = record.Values[indexOf];
TSDataType tSDataType = dataTypes[indexOf]; TSDataType tSDataType = dataTypes[indexOf];
var prop = properties.FirstOrDefault(p => if (!memberCache.TryGetValue(measurement, out var member) && !(value is System.DBNull))
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null && !(value is System.DBNull))
{ {
dynamic tempValue = GetTSDataValue(tSDataType, value); throw new Exception($"{nameof(ParseResults)} 解析查询结果 {accessor.EntityName} 属性赋值出现异常,没有找到{measurement}对应的 member信息");
}
if (measurement.ToLower().EndsWith("time")) dynamic tempValue = GetTSDataValue(tSDataType, value);
{
typeof(T).GetProperty(measurement)?.SetValue(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds)); if (measurement.ToLower().EndsWith("time"))
} {
else member.Setter(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds));
{ }
typeof(T).GetProperty(measurement)?.SetValue(entity, tempValue); else
} {
member.Setter(entity, tempValue);
} }
} }
@ -598,72 +591,63 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary> /// <summary>
/// 获取设备元数据的列 /// 获取设备元数据的列
/// </summary> /// </summary>
/// <param name="type"></param> /// <param name="accessor"></param>
/// <returns></returns> /// <returns></returns>
private List<ColumnInfo> CollectColumnMetadata(Type type) private List<ColumnInfo> CollectColumnMetadata<T>(ISourceEntityAccessor<T> accessor)
{ {
var columns = new List<ColumnInfo>(); var columns = new List<ColumnInfo>();
var memberCache = new Dictionary<string, EntityMemberInfo>(); // 缓存优化查询
foreach (var prop in type.GetProperties()) // 预构建成员缓存Key: NameOrPath
foreach (var member in accessor.MemberList)
{ {
memberCache[member.NameOrPath] = member;
}
string typeName = string.Empty; foreach (var member in accessor.MemberList)
{
// 过滤元组子项
if (member.NameOrPath.Contains(".Item")) continue;
Type declaredType = prop.PropertyType; // 类型名称处理
// 处理可空类型 Type declaredType = member.DeclaredType;
if (declaredType.IsGenericType && declaredType.GetGenericTypeDefinition() == typeof(Nullable<>)) var underlyingType = Nullable.GetUnderlyingType(declaredType);
string declaredTypeName = underlyingType?.Name ?? member.DeclaredTypeName;
// 特性查询优化
var attributes = member.CustomAttributes ?? Enumerable.Empty<Attribute>();
var tagAttr = attributes.OfType<TAGColumnAttribute>().FirstOrDefault();
var attrColumn = attributes.OfType<ATTRIBUTEColumnAttribute>().FirstOrDefault();
var fieldColumn = attributes.OfType<FIELDColumnAttribute>().FirstOrDefault();
var singleMeasuringAttr = attributes.OfType<SingleMeasuringAttribute>().FirstOrDefault();
// 构建ColumnInfo
ColumnInfo? column = null;
if (tagAttr != null)
{ {
Type underlyingType = Nullable.GetUnderlyingType(declaredType); column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false);
typeName = underlyingType.Name;
} }
else else if (attrColumn != null)
{ {
typeName = declaredType.Name; column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false);
}
else if (fieldColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false);
} }
//先获取Tag标签和属性标签 // 单测模式处理
ColumnInfo? column = prop.GetCustomAttribute<TAGColumnAttribute>() is not null ? new ColumnInfo( if (singleMeasuringAttr != null && column == null)
name: prop.Name,
category: ColumnCategory.TAG,
dataType: GetDataTypeFromTypeName(typeName),
false
) : prop.GetCustomAttribute<ATTRIBUTEColumnAttribute>() is not null ? new ColumnInfo(
prop.Name,
ColumnCategory.ATTRIBUTE,
GetDataTypeFromTypeName(typeName),
false
) : prop.GetCustomAttribute<FIELDColumnAttribute>() is not null ? new ColumnInfo(
prop.Name,
ColumnCategory.FIELD,
GetDataTypeFromTypeName(typeName),
false)
: null;
//最先检查是不是单侧点模式
SingleMeasuringAttribute singleMeasuringAttribute = prop.GetCustomAttribute<SingleMeasuringAttribute>();
if (singleMeasuringAttribute != null && column == null)
{ {
//warning: 单侧点模式注意事项 var tupleItemKey = $"{member.NameOrPath}.Item2";
//Entity实体 字段类型是 Tuple<string,T>,Item1=>测点名称Item2=>测点值,泛型 if (!memberCache.TryGetValue(tupleItemKey, out var tupleMember))
//只有一个Filed字段。 {
//MeasuringName 默认为 SingleMeasuringAttribute.FieldName以便于在获取对应的Value的时候重置为 Item1 的值。 throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常");
}
Type tupleType = prop.PropertyType; column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true);
Type[] tupleArgs = tupleType.GetGenericArguments();
column = new ColumnInfo(
singleMeasuringAttribute.FieldName,
ColumnCategory.FIELD,
GetDataTypeFromTypeName(tupleArgs[1].Name),
true
);
} }
if (column.HasValue) if (column.HasValue) columns.Add(column.Value);
{
columns.Add(column.Value);
}
} }
return columns; return columns;
} }
@ -693,15 +677,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata);
ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata);
var entityTypeAttribute = typeof(T).GetCustomAttribute<EntityTypeAttribute>();
if (entityTypeAttribute == null)
{
throw new ArgumentException($"{nameof(BuildDeviceMetadata)} 构建设备元数据时 {nameof(IoTEntity)} 的EntityType 没有指定,属于异常情况,-101");
}
metadata.EntityType = entityTypeAttribute.EntityType;
return metadata; return metadata;
} }

View File

@ -93,10 +93,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{ {
SystemName = _applicationOptions.SystemType, SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}", DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType.ToString()}.{IOTDBDataType.Data}", DeviceType = $"{data.DeviceType.ToString()}",
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Data,
Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), Timestamps = data.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(),
SingleMeasuring = new Tuple<string, T>(data.FiledName ?? string.Empty, data.DataValue ?? default) SingleMeasuring = (data.FiledName ?? string.Empty, data.DataValue ?? default)
}; };
_runtimeContext.UseTableSessionPool = true; // 使用表模型池 _runtimeContext.UseTableSessionPool = true; // 使用表模型池
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
@ -196,10 +197,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{ {
SystemName = _applicationOptions.SystemType, SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}", DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}.{IOTDBDataType.Data}", DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}", ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Data,
Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整 Timestamps = item.TimeSpan!.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity).GetDateTimeOffset().ToUnixTimeNanoseconds(), // TODO:这里暂时格式化15分钟数据需要进行调整
SingleMeasuring = new Tuple<string, T>(item.FiledName ?? string.Empty, item.DataValue) SingleMeasuring =(item.FiledName ?? string.Empty, item.DataValue ?? default)
}; };
_runtimeContext.UseTableSessionPool = true; // 使用表模型池 _runtimeContext.UseTableSessionPool = true; // 使用表模型池
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 }); var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions, PageIndex = 0, PageSize = 1 });
@ -280,10 +282,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{ {
SystemName = _applicationOptions.SystemType, SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}", DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Data,
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = new Tuple<string, T>(data.FiledName!, data.DataValue!) SingleMeasuring = (data.FiledName!, data.DataValue!)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 _runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeData); await _dbProvider.InsertAsync(treeData);
@ -292,10 +295,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{ {
SystemName = _applicationOptions.SystemType, SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}", DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Data,
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = new Tuple<string, string>(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty) SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 _runtimeContext.UseTableSessionPool = false; // 使树模型池
@ -306,10 +310,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{ {
SystemName = _applicationOptions.SystemType, SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}", DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = new Tuple<string, long>(ConcentratorStatusFieldConst.RecordingTime, (data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now).GetDateTimeOffset().ToUnixTimeNanoseconds()) DataType = IOTDBDataTypeConst.Status,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, (data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now).GetDateTimeOffset().ToUnixTimeNanoseconds())
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 _runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeRecordingTimeData); await _dbProvider.InsertAsync(treeRecordingTimeData);
@ -318,10 +323,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{ {
SystemName = _applicationOptions.SystemType, SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}", DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = new Tuple<string, string>(ConcentratorStatusFieldConst.Remark, data.FiledDesc ?? string.Empty) SingleMeasuring = (ConcentratorStatusFieldConst.Remark, data.FiledDesc ?? string.Empty)
}; };
_runtimeContext.UseTableSessionPool = false; // 使树模型池 _runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(treeRemarkData); await _dbProvider.InsertAsync(treeRemarkData);

View File

@ -730,7 +730,6 @@ namespace JiShe.CollectBus.Protocol.T37612012
List<string> values = new List<string>() { $"{dataTime:yy}", $"{dataTime:MM}", $"{dataTime:dd}", $"{dataTime:HH}", $"{dataTime:mm}", }; List<string> values = new List<string>() { $"{dataTime:yy}", $"{dataTime:MM}", $"{dataTime:dd}", $"{dataTime:HH}", $"{dataTime:mm}", };
values.Reverse(); values.Reverse();
return values; return values;
//return string.Join("", values);
} }
#endregion #endregion

View File

@ -21,14 +21,13 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入 /// 定时任务数据通道写入
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems); Task ScheduledMeterTaskWriterAsync(ChannelWriter<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, ValueTuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems);
/// <summary> /// <summary>
/// 定时任务数据入库和Kafka推送通道 /// 定时任务数据入库和Kafka推送通道
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterTaskReadingAsync(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader, Task ScheduledMeterTaskReadingAsync(ChannelReader<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader );
CancellationToken cancellationToken);
#endregion #endregion
} }
} }

View File

@ -81,7 +81,7 @@ public class CollectBusApplicationModule : AbpModule
//}).ConfigureAwait(false); //}).ConfigureAwait(false);
//下发任务通道构建 //下发任务通道构建
DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<Tuple<string, List<MeterReadingTelemetryPacketInfo>>>(); DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();

View File

@ -13,6 +13,6 @@ namespace JiShe.CollectBus.DataChannels
/// <summary> /// <summary>
/// 下发任务通道 /// 下发任务通道
/// </summary> /// </summary>
public static Channel<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> TaskDataChannel; public static Channel<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> TaskDataChannel;
} }
} }

View File

@ -50,7 +50,7 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入 /// 定时任务数据通道写入
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public async Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems) public async Task ScheduledMeterTaskWriterAsync(ChannelWriter<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, ValueTuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
{ {
await _telemetryPacketInfoWriter.WriteAsync(dataItems); await _telemetryPacketInfoWriter.WriteAsync(dataItems);
} }
@ -61,47 +61,62 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据入库和Kafka推送通道 /// 定时任务数据入库和Kafka推送通道
/// </summary> /// </summary>
public async Task ScheduledMeterTaskReadingAsync( public async Task ScheduledMeterTaskReadingAsync(
ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader, ChannelReader<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader)
CancellationToken cancellationToken = default)
{ {
const int BatchSize = 20000; // 修正批次大小 const int BatchSize = 20000;
const int EmptyWaitMilliseconds = 1000; const int EmptyWaitMilliseconds = 1000;
var timeout = TimeSpan.FromSeconds(5); var timeout = TimeSpan.FromSeconds(5);
var timer = Stopwatch.StartNew();
long timeoutMilliseconds = 0;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>(); var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
try try
{ {
while (!cancellationToken.IsCancellationRequested) while (true)
{ {
var batchStopwatch = Stopwatch.StartNew(); var batch = new List<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
var batch = new List<Tuple<string, List<MeterReadingTelemetryPacketInfo>>>(); var canRead = telemetryPacketInfoReader.Count;
if (canRead <= 0)
{
if (timeoutMilliseconds > 0)
{
_logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
}
timeoutMilliseconds = 0;
//无消息时短等待1秒
await Task.Delay(EmptyWaitMilliseconds);
continue;
}
timer.Restart();
var startTime = DateTime.Now;
try try
{ {
// 异步批量读取数据 // 异步批量读取数据
while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout) while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout)
{ {
while (telemetryPacketInfoReader.TryRead(out var data)) try
{ {
batch.Add(data); if (telemetryPacketInfoReader.TryRead(out var dataItem))
if (batch.Count >= BatchSize) break; {
batch.Add(dataItem);
}
}
catch (Exception)
{
throw;
} }
if (batch.Count >= BatchSize) break;
// 无更多数据时等待
if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken))
break;
} }
} }
catch (OperationCanceledException) catch (Exception)
{ {
break; throw;
} }
if (batch.Count == 0) if (batch.Count == 0)
{ {
await Task.Delay(EmptyWaitMilliseconds, cancellationToken); await Task.Delay(EmptyWaitMilliseconds);
continue; continue;
} }
@ -125,22 +140,26 @@ namespace JiShe.CollectBus.DataChannels
// 批量写入数据库 // 批量写入数据库
await _dbProvider.BatchInsertAsync(metadata, records); await _dbProvider.BatchInsertAsync(metadata, records);
// 限流推送Kafka //// 限流推送Kafka
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( //await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: records, // items: records,
deviceIdSelector: data => data.DeviceId, // deviceIdSelector: data => data.DeviceId,
processor: async (data, groupIndex) => // processor: async (data, groupIndex) =>
await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) // await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
); //);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName); _logger.LogError(ex, "数据通道处理主题 {TopicName} 数据时发生异常", topicName);
} }
} }
_logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms", batch.Clear();
batch.Count, batchStopwatch.ElapsedMilliseconds); timer.Stop();
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
startTime = DateTime.Now;
} }
} }
catch (Exception ex) catch (Exception ex)

View File

@ -60,6 +60,10 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet] [HttpGet]
public async Task UseSessionPool(long testTime) public async Task UseSessionPool(long testTime)
{ {
var dataTime = DateTime.Now;
List<string> values = new List<string>() { $"{dataTime:yy}", $"{dataTime:MM}", $"{dataTime:dd}", $"{dataTime:HH}", $"{dataTime:mm}", };
ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel() ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel()
{ {
@ -74,6 +78,11 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
Timestamps = testTime// DateTimeOffset.UtcNow.ToUnixTimeNanoseconds()//testTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), Timestamps = testTime// DateTimeOffset.UtcNow.ToUnixTimeNanoseconds()//testTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
}; };
//ElectricityMeterTreeModelExtension.GetCurrent() //ElectricityMeterTreeModelExtension.GetCurrent()
//SourceEntityAccessorFactory.SetCurrent(meter);
//ElectricityMeterTreeModelAccessor.
//TableModelSingleMeasuringEntityExtension
//TableModelSingleMeasuringEntityAccessor.GetSystemName(meter);
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
@ -172,6 +181,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet] [HttpGet]
public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time) public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time)
{ {
time = DateTime.Now;
//System.Reflection.PropertyInfo;
//System.Reflection.FieldInfo
var meter = new TreeModelSingleMeasuringEntity<string>() var meter = new TreeModelSingleMeasuringEntity<string>()
{ {
SystemName = "energy", SystemName = "energy",
@ -179,7 +191,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "1", DeviceType = "1",
ProjectId = "10059", ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, string>(measuring, value) SingleMeasuring = (measuring, value)
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
@ -192,6 +204,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet] [HttpGet]
public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time) public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time)
{ {
time = DateTime.Now;
var meter = new TreeModelSingleMeasuringEntity<int>() var meter = new TreeModelSingleMeasuringEntity<int>()
{ {
SystemName = "energy", SystemName = "energy",
@ -199,7 +212,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, int>(measuring, value) SingleMeasuring = (measuring, value)
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
@ -212,6 +225,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet] [HttpGet]
public async Task TestTableModelSingleMeasuringEntity(string measuring, string value, DateTime time) public async Task TestTableModelSingleMeasuringEntity(string measuring, string value, DateTime time)
{ {
time = DateTime.Now;
var meter = new TableModelSingleMeasuringEntity<string>() var meter = new TableModelSingleMeasuringEntity<string>()
{ {
SystemName = "energy", SystemName = "energy",
@ -219,7 +234,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = new Tuple<string, string>(measuring, value) SingleColumn = (measuring, value)
}; };
_dbContext.UseTableSessionPool = true; _dbContext.UseTableSessionPool = true;
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
@ -233,6 +248,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet] [HttpGet]
public async Task TestTableModelSingleMeasuringEntity2(string measuring, int value, DateTime time) public async Task TestTableModelSingleMeasuringEntity2(string measuring, int value, DateTime time)
{ {
time = DateTime.Now;
var meter = new TableModelSingleMeasuringEntity<int>() var meter = new TableModelSingleMeasuringEntity<int>()
{ {
SystemName = "energy", SystemName = "energy",
@ -240,7 +257,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = new Tuple<string, int>(measuring, value) SingleColumn = (measuring, value)
}; };
_dbContext.UseTableSessionPool = true; _dbContext.UseTableSessionPool = true;
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);

View File

@ -160,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
@ -177,7 +177,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
@ -194,7 +194,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
@ -211,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
@ -228,7 +228,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else else
@ -262,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); //_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask));
}); });
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@ -281,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
}); });
} }
else else
@ -307,7 +307,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务"); _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
} }
#region #region
@ -329,10 +329,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
// 创建取消令牌源 return;
var cts = new CancellationTokenSource();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader, cts.Token); // 创建取消令牌源
//var cts = new CancellationTokenSource();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader );
//此处代码不要删除 //此处代码不要删除
#if DEBUG #if DEBUG

View File

@ -1,12 +1,10 @@
using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
namespace JiShe.CollectBus.Ammeters namespace JiShe.CollectBus.Ammeters
{ {
[EntityType(EntityTypeEnum.TableModel)] [SourceAnalyzers(EntityTypeEnum.TableModel)]
[SourceAnalyzers]
public class ElectricityMeter : IoTEntity public class ElectricityMeter : IoTEntity
{ {
[ATTRIBUTEColumn] [ATTRIBUTEColumn]
@ -31,7 +29,7 @@ namespace JiShe.CollectBus.Ammeters
public double Current { get; set; } public double Current { get; set; }
[FIELDColumn] [FIELDColumn]
public double Power => Voltage * Current; public double Power { get; set; }
[FIELDColumn] [FIELDColumn]
public double? Currentd { get; set; } public double? Currentd { get; set; }

View File

@ -1,12 +1,11 @@
using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
using System;
namespace JiShe.CollectBus.Ammeters namespace JiShe.CollectBus.Ammeters
{ {
[EntityType(EntityTypeEnum.TreeModel)] [SourceAnalyzers(EntityTypeEnum.TreeModel)]
[SourceAnalyzers]
public class ElectricityMeterTreeModel : IoTEntity public class ElectricityMeterTreeModel : IoTEntity
{ {
[ATTRIBUTEColumn] [ATTRIBUTEColumn]
@ -31,9 +30,11 @@ namespace JiShe.CollectBus.Ammeters
public double Current { get; set; } public double Current { get; set; }
[FIELDColumn] [FIELDColumn]
public double Power => Voltage * Current; public double Power { get; set; }
[FIELDColumn] [FIELDColumn]
public double? Currentd { get; set; } public double? Currentd { get; set; }
public ValueTuple<int, string> TupleData { get; set; }
} }
} }

View File

@ -1,6 +1,5 @@
using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
using System; using System;
@ -9,8 +8,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// <summary> /// <summary>
/// 抄读任务数据 /// 抄读任务数据
/// </summary> /// </summary>
[EntityType(EntityTypeEnum.TableModel)] [SourceAnalyzers(EntityTypeEnum.TableModel)]
[SourceAnalyzers]
public class MeterReadingTelemetryPacketInfo : IoTEntity public class MeterReadingTelemetryPacketInfo : IoTEntity
{ {
/// <summary> /// <summary>

View File

@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace JiShe.CollectBus.Analyzers.Shared
{
/// <summary>
/// 实体成员信息
/// </summary>
public sealed class EntityMemberInfo
{
/// <summary>
/// 名称或者路径
/// </summary>
public string NameOrPath { get; set; }
/// <summary>
/// 声明的类型
/// </summary>
public Type DeclaredType { get; set; }
/// <summary>
/// 声明的类型的名称
/// </summary>
public string DeclaredTypeName { get; }
/// <summary>
/// 获取值
/// </summary>
public Func<object, object> Getter { get; }
/// <summary>
/// 设置值
/// </summary>
public Action<object, object> Setter { get; }
/// <summary>
/// 自定义Attribute集合
/// </summary>
public List<Attribute> CustomAttributes { get; set; }
public EntityMemberInfo(
string nameOrPath,
Type declaredType,
string declaredTypeName,
Func<object, object> getter,
Action<object, object> setter)
{
NameOrPath = nameOrPath;
this.DeclaredType = declaredType;
DeclaredTypeName = declaredTypeName;
Getter = getter;
Setter = setter;
}
public object GetValue(object entity) => Getter(entity);
public void SetValue(object entity, object value) => Setter(entity, value);
}
}

View File

@ -1,24 +1,27 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDB.Enums namespace JiShe.CollectBus.Analyzers.Shared
{ {
/// <summary> /// <summary>
/// IoTDB实体类型枚举 /// 实体类型枚举
/// </summary> /// </summary>
public enum EntityTypeEnum public enum EntityTypeEnum
{ {
/// <summary> /// <summary>
/// 树模型 /// IoTDB树模型
/// </summary> /// </summary>
TreeModel = 1, TreeModel = 1,
/// <summary> /// <summary>
/// 表模型 /// IoTDB表模型
/// </summary> /// </summary>
TableModel = 2, TableModel = 2,
/// <summary>
/// 其他情况
/// </summary>
Other = 3
} }
} }

View File

@ -1,11 +1,22 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Reflection;
using System.Text; using System.Text;
namespace JiShe.CollectBus.Analyzers namespace JiShe.CollectBus.Analyzers.Shared
{ {
public interface ISourceAnalyzersProvider<T> public interface ISourceEntityAccessor<T>
{ {
/// <summary>
/// 实体类名称
/// </summary>
string EntityName { get; }
/// <summary>
/// 实体类型
/// </summary>
EntityTypeEnum? EntityType { get;}
/// <summary> /// <summary>
/// 获取属性值 /// 获取属性值
/// </summary> /// </summary>
@ -23,18 +34,13 @@ namespace JiShe.CollectBus.Analyzers
void SetPropertyValue(T entity, string propertyName, object value); void SetPropertyValue(T entity, string propertyName, object value);
/// <summary> /// <summary>
/// 判断是否是元组属性 /// 属性名称集合
/// </summary> /// </summary>
/// <param name="propertyName"></param> List<string> PropertyNameList { get; }
/// <returns></returns>
bool IsTupleProperty(string propertyName);
/// <summary> /// <summary>
/// 获取元组属性值 /// 属性信息集合
/// </summary> /// </summary>
/// <param name="entity"></param> List<EntityMemberInfo> MemberList { get; }
/// <param name="tuplePropertyName"></param>
/// <returns></returns>
(object Item1, object Item2) GetTupleParts(T entity, string tuplePropertyName);
} }
} }

View File

@ -8,5 +8,12 @@ namespace JiShe.CollectBus.Analyzers.Shared
[AttributeUsage(AttributeTargets.Class)] [AttributeUsage(AttributeTargets.Class)]
public class SourceAnalyzersAttribute : Attribute public class SourceAnalyzersAttribute : Attribute
{ {
public EntityTypeEnum EntityType { get; }
public SourceAnalyzersAttribute(EntityTypeEnum entityType)
{
EntityType = entityType;
}
} }
} }

View File

@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Consts
{
/// <summary>
/// IOTDB数据树类型
/// </summary>
public class IOTDBDataTypeConst
{
/// <summary>
/// 数据
/// </summary>
public const string Data = "Data";
/// <summary>
/// 事件
/// </summary>
public const string Event = "Event";
/// <summary>
/// 状态
/// </summary>
public const string Status = "Status";
}
}

View File

@ -152,26 +152,6 @@ namespace JiShe.CollectBus.Common.Consts
} }
/// <summary>
/// IOTDB数据树类型
/// </summary>
public class IOTDBDataType
{
/// <summary>
/// 数据
/// </summary>
public const string Data = "Data";
/// <summary>
/// 事件
/// </summary>
public const string Event = "Event";
/// <summary>
/// 状态
/// </summary>
public const string Status = "Status";
}
/// <summary> /// <summary>
/// 集中器状态字段 /// 集中器状态字段