Compare commits

..

2 Commits

44 changed files with 861 additions and 1568 deletions

View File

@ -1,7 +1,7 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 8080
EXPOSE 10500
EXPOSE 80
EXPOSE 443
ENV TZ=Asia/Shanghai
ENV ASPNETCORE_ENVIRONMENT=Production
@ -38,7 +38,15 @@ RUN mkdir -p /app/Plugins
# 复制发布内容
COPY --from=publish /app/publish .
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:80/health || exit 1
# 设置入口点
ENTRYPOINT ["dotnet", "JiShe.CollectBus.Host.dll"]
# 启动命令
# 可选:添加命令行参数
# CMD ["--urls", "http://+:80"]

View File

@ -348,58 +348,44 @@ namespace JiShe.CollectBus.IncrementalGenerator
private static string BuildFactoryCode()
{
return """
using System;
using System.Collections.Concurrent;
using System.Reflection;
using System;
using System.Collections.Concurrent;
using System.Reflection;
namespace JiShe.CollectBus.Analyzers.Shared;
namespace JiShe.CollectBus.Analyzers.Shared;
public static class SourceEntityAccessorFactory
public static class SourceEntityAccessorFactory
{
private static readonly ConcurrentDictionary<Type, object> _accessors = new();
public static ISourceEntityAccessor<T> GetAccessor<T>()
{
private static readonly ConcurrentDictionary<Type, object> _accessors = new();
public static ISourceEntityAccessor<T> GetAccessor<T>()
return (ISourceEntityAccessor<T>)_accessors.GetOrAdd(typeof(T), t =>
{
return (ISourceEntityAccessor<T>)_accessors.GetOrAdd(typeof(T), t =>
// 获取泛型类型定义信息(如果是泛型类型)
var isGeneric = t.IsGenericType;
var genericTypeDef = isGeneric ? t.GetGenericTypeDefinition() : null;
var arity = isGeneric ? genericTypeDef!.GetGenericArguments().Length : 0;
// 构建访问器类名
var typeName = isGeneric
? $"{t.Namespace}.{genericTypeDef!.Name.Split('`')[0]}Accessor`{arity}"
: $"{t.Namespace}.{t.Name}Accessor";
// 尝试从当前程序集加载
var accessorType = Assembly.GetAssembly(t)!.GetType(typeName)
?? throw new InvalidOperationException($"Accessor type {typeName} not found");
// 处理泛型参数
if (isGeneric && accessorType.IsGenericTypeDefinition)
{
// 获取泛型类型定义信息(如果是泛型类型)
var isGeneric = t.IsGenericType;
var genericTypeDef = isGeneric ? t.GetGenericTypeDefinition() : null;
var arity = isGeneric ? genericTypeDef!.GetGenericArguments().Length : 0;
accessorType = accessorType.MakeGenericType(t.GetGenericArguments());
}
// 构建访问器类名
var typeName = isGeneric
? $"{t.Namespace}.{genericTypeDef!.Name.Split('`')[0]}Accessor`{arity}"
: $"{t.Namespace}.{t.Name}Accessor";
// 尝试从当前程序集加载
var accessorType = Assembly.GetAssembly(t)!.GetType(typeName)
?? throw new InvalidOperationException($"Accessor type {typeName} not found");
// 处理泛型参数
if (isGeneric && accessorType.IsGenericTypeDefinition)
{
accessorType = accessorType.MakeGenericType(t.GetGenericArguments());
}
return Activator.CreateInstance(accessorType)!;
});
}
public static object GetAccessor(Type type)
{
MethodInfo getAccessorMethod = typeof(SourceEntityAccessorFactory)
.GetMethod(
name: nameof(GetAccessor),
bindingAttr: BindingFlags.Public | BindingFlags.Static,
types: Type.EmptyTypes
);
MethodInfo genericMethod = getAccessorMethod.MakeGenericMethod(type);
return genericMethod.Invoke(null, null);
}
return Activator.CreateInstance(accessorType)!;
});
}
}
""";
}
@ -545,8 +531,8 @@ namespace JiShe.CollectBus.IncrementalGenerator
var entityType = prop.ContainingType.ToDisplayString();//entity 实体类型名称
var propType = prop.Type;//实体属性的类型
var propTypeName = propType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
// var declaredTypeName = propType.Name; // 直接获取类型名称(如 "Int32"
// 处理可空类型,获取底层具体类型名称
// var declaredTypeName = propType.Name; // 直接获取类型名称(如 "Int32"
// 处理可空类型,获取底层具体类型名称
var declaredTypeName = propType switch
{
INamedTypeSymbol { OriginalDefinition.SpecialType: SpecialType.System_Nullable_T } nullableType =>
@ -597,19 +583,19 @@ namespace JiShe.CollectBus.IncrementalGenerator
$"GetValueTupleElementDeclaredTypeName(typeof({elementType})), " +//$"\"{elementDeclaredName}\", " +
$"(e) => Get{prop.Name}_{elementName}(({entityType})e), " +
$"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))");
}
}
}
}
}
}
code.AppendLine(string.Join(",\n", initializerLines));
code.AppendLine(" };");
code.AppendLine(string.Join(",\n", initializerLines));
code.AppendLine(" };");
code.AppendLine(GetValueTupleElementName());
}
code.AppendLine(GetValueTupleElementName());
}
private static string GetValueTupleElementName()
{
return """
private static string GetValueTupleElementName()
{
return """
public static string GetValueTupleElementDeclaredTypeName(Type declaredType)
{
string typeName;
@ -627,7 +613,7 @@ namespace JiShe.CollectBus.IncrementalGenerator
return typeName;
}
""";
}
}
private static string GenerateAttributeInitializer(AttributeData attribute)

View File

@ -1,14 +0,0 @@
namespace JiShe.CollectBus.IoTDB.Attributes
{
/// <summary>
/// 需要忽略表模型初始化,有此特性无需初始化
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class IgnoreInitTableAttribute : System.Attribute
{
public IgnoreInitTableAttribute()
{
}
}
}

View File

@ -10,11 +10,12 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// </summary>
public interface IIoTDbProvider
{
/// <summary>
/// 切换 SessionPool
/// </summary>
/// <param name="sessionpolType">是否使用表模型</param>
/// <returns></returns>
///// <summary>
///// 切换 SessionPool
///// </summary>
///// <param name="useTableSession">是否使用表模型</param>
//void SwitchSessionPool(bool useTableSession);
IIoTDbProvider GetSessionPool(bool sessionpolType);
/// <summary>
@ -65,11 +66,5 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <param name="options"></param>
/// <returns></returns>
Task<BusPagedResult<T>> QueryAsync<T>(IoTDBQueryOptions options) where T : IoTEntity, new();
/// <summary>
/// 初始化表模型
/// </summary>
/// <returns></returns>
Task InitTableSessionModelAsync();
}
}

View File

@ -32,12 +32,5 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <param name="sql"></param>
/// <returns></returns>
Task<SessionDataSet> ExecuteQueryStatementAsync(string sql);
/// <summary>
/// 执行无返回结果SQL
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
Task<int> ExecuteNonQueryStatementAsync(string sql);
}
}

View File

@ -50,7 +50,7 @@ namespace JiShe.CollectBus.IoTDB.Model
/// </summary>
private string _devicePath;
/// <summary>
/// 设备路径,树模型使用,表模型会在数据插入的时候直接获取继承类的名称作为表明
/// 设备路径
/// </summary>
public virtual string DevicePath
{

View File

@ -7,7 +7,6 @@ namespace JiShe.CollectBus.IoTDB.Model
/// Table模型单项数据实体
/// </summary>
[SourceAnalyzers(EntityTypeEnum.TableModel)]
[IgnoreInitTable]
public class TableModelSingleMeasuringEntity<T> : IoTEntity
{
/// <summary>

View File

@ -26,8 +26,6 @@ using System.Diagnostics.Metrics;
using Newtonsoft.Json.Linq;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Text.RegularExpressions;
using System.Xml.Linq;
using System.Linq;
namespace JiShe.CollectBus.IoTDB.Provider
{
@ -88,7 +86,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
_logger.LogWarning($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {tablet.First().InsertTargetName}");
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {tablet.First().InsertTargetName}");
await CurrentSession.InsertAsync(tablet.First());
}
@ -129,7 +127,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var item in tablet)
{
_logger.LogWarning($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {item.InsertTargetName}");
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {item.InsertTargetName}");
await CurrentSession.InsertAsync(item);
}
@ -368,10 +366,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
tableNameOrTreePath = metadata.TableNameOrTreePath;
}
else if(metadata.IsSingleMeasuring && string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath) == true)//单侧点时,且路径没有指定,取默认实体名称和第一个列名组合。
{
tableNameOrTreePath = $"{metadata.EntityName}_{metadata.ColumnNames.First()}";
}
else
{
tableNameOrTreePath = DevicePathBuilder.GetTableName<T>();
@ -636,7 +630,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
// 过滤元组子项
if (member.NameOrPath.Contains(".Item")) continue;
// 类型名称处理
string declaredTypeName = member.DeclaredTypeName;
// 特性查询优化
var attributes = member.CustomAttributes ?? Enumerable.Empty<Attribute>();
var tagAttr = attributes.OfType<TAGColumnAttribute>().FirstOrDefault();
@ -845,25 +842,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
/// <summary>
/// 处理不同列类型的逻辑
/// </summary>
/// <param name="groupedColumns"></param>
/// <param name="category"></param>
private string ProcessCategory(IReadOnlyDictionary<ColumnCategory, List<ColumnInfo>> groupedColumns, ColumnCategory category)
{
if (groupedColumns.TryGetValue(category, out var cols))
{
List<string> tempColumnInfos = new List<string>();
foreach (var item in cols)
{
tempColumnInfos.Add($" {item.Name} {item.DataType} {item.Category}");
}
return string.Join(",", tempColumnInfos);
}
return string.Empty;
}
/// <summary>
/// 数据列结构
/// </summary>
@ -998,136 +976,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
return cache;
}
/// <summary>
/// 初始化表模型
/// </summary>
/// <returns></returns>
public async Task InitTableSessionModelAsync()
{
//获取JiShe.CollectBus.IoTDB程序集和JiShe.CollectBus.Domain程序集中的所有 [SourceAnalyzers(EntityTypeEnum.TableModel)] 的实体
var assemblyNames = new[] { "JiShe.CollectBus.IoTDB", "JiShe.CollectBus.Domain" };
var assemblies = CommonHelper.LoadAssemblies(assemblyNames);
var targetTypes = CollectTargetTypes(assemblies);
if (targetTypes == null || targetTypes.Count <= 0)
{
_logger.LogError($"{nameof(InitTableSessionModelAsync)} 初始化表模型时没有找到对应的实体类信息。");
return;
}
// @"CREATE TABLE table1(
// time TIMESTAMP TIME,
// region STRING TAG,
// plant_id STRING TAG,
// device_id STRING TAG,
// model_id STRING ATTRIBUTE,
// maintenance STRING ATTRIBUTE,
// temperature FLOAT FIELD,
// humidity FLOAT FIELD,
// status Boolean FIELD,
// arrival_time TIMESTAMP FIELD
//) COMMENT 'table1' WITH(TTL = 31536000000);";
foreach (var item in targetTypes)
{
var accessor = SourceEntityAccessorFactory.GetAccessor(item);
//通过 dynamic 简化操作
dynamic dynamicAccessor = accessor;
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.Append("CREATE TABLE IF NOT EXISTS ");
stringBuilder.Append(dynamicAccessor.EntityName);
stringBuilder.Append("( time TIMESTAMP TIME, ");
var columns = new List<ColumnInfo>();
foreach (var member in dynamicAccessor.MemberList)
{
// 过滤元组子项
if (member.NameOrPath.Contains(".Item")) continue;
// 特性查询优化
var attributes = (IEnumerable<Attribute>)(member.CustomAttributes ?? Enumerable.Empty<Attribute>());
var tagAttr = attributes.OfType<TAGColumnAttribute>().FirstOrDefault();
var attrColumn = attributes.OfType<ATTRIBUTEColumnAttribute>().FirstOrDefault();
var fieldColumn = attributes.OfType<FIELDColumnAttribute>().FirstOrDefault();
// 构建ColumnInfo
ColumnInfo? column = null;
if (tagAttr != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(member.DeclaredTypeName), false, member.DeclaredTypeName);
}
else if (attrColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(member.DeclaredTypeName), false, member.DeclaredTypeName);
}
else if (fieldColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(member.DeclaredTypeName), false, member.DeclaredTypeName);
}
if (!column.HasValue)
{
_logger.LogError($"{nameof(InitTableSessionModelAsync)} 初始化表模型时实体类{dynamicAccessor.EntityName}的{member.NameOrPath}列的ColumnInfo构建失败。");
continue;
}
columns.Add(column.Value);
}
//按业务逻辑顺序处理TAG -> ATTRIBUTE -> FIELD
var groupedColumns = columns
.GroupBy(c => c.Category)
.ToDictionary(g => g.Key, g => g.ToList());
List<string> tempColumInfos = new List<string>();
tempColumInfos.Add( ProcessCategory(groupedColumns, ColumnCategory.TAG));
tempColumInfos.Add(ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE));
tempColumInfos.Add(ProcessCategory(groupedColumns, ColumnCategory.FIELD));
stringBuilder.Append(string.Join(",", tempColumInfos.Where(d => !string.IsNullOrWhiteSpace(d))));
stringBuilder.Append($" ) COMMENT '{item.Name}' ");
_logger.LogWarning($"{dynamicAccessor.EntityName} 初始化语句:{stringBuilder.ToString()}");
await CurrentSession.ExecuteNonQueryStatementAsync($"{stringBuilder.ToString()}");
}
}
/// <summary>
/// 获取程序集中的所有 [SourceAnalyzers(EntityTypeEnum.TableModel)] 的实体
/// </summary>
/// <param name="assemblies"></param>
/// <returns></returns>
private List<Type> CollectTargetTypes(List<Assembly> assemblies)
{
var targetTypes = new List<Type>();
foreach (var assembly in assemblies)
{
try
{
foreach (var type in assembly.GetExportedTypes())
{
//获取表模型特性的类
var sourceAnalyzersAttribute = type.GetCustomAttribute<SourceAnalyzersAttribute>();
//需要忽略表模型初始化,有此特性无需初始化
var ignoreInitTableAttribute = type.GetCustomAttribute<IgnoreInitTableAttribute>();
if (sourceAnalyzersAttribute?.EntityType == EntityTypeEnum.TableModel && ignoreInitTableAttribute == null)
{
if (type.GetConstructor(Type.EmptyTypes) != null)
targetTypes.Add(type);
}
}
}
catch (ReflectionTypeLoadException ex)
{
_logger.LogError($"加载 {assembly} 失败: {string.Join(", ", ex.LoaderExceptions.Select(e => e.Message))}");
}
}
return targetTypes;
}
private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled);
}
}

View File

@ -71,7 +71,8 @@ namespace JiShe.CollectBus.IoTDB.Provider
if (result != 0)
{
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}请检查IoTEntity继承子类属性索引是否有变动。");
}
}
//await CloseAsync();
return result;
}
@ -82,18 +83,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
return result;
}
/// <summary>
/// 执行无返回结果SQL
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<int> ExecuteNonQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteNonQueryStatementAsync(sql);
var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
}

View File

@ -70,7 +70,8 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}请检查IoTEntity继承子类属性索引是否有变动。");
}
//await CloseAsync();
return result;
}
@ -81,18 +82,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
return result;
}
/// <summary>
/// 执行无返回结果SQL
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<int> ExecuteNonQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteNonQueryStatementAsync(sql);
var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
}

View File

@ -8,11 +8,10 @@ using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.AdminClient;
public class AdminClientService : IAdminClientService, ISingletonDependency
public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency
{
private readonly ILogger<AdminClientService> _logger;
private readonly KafkaOptionConfig _kafkaOptionConfig;
private readonly Lazy<IAdminClient> _lazyAdminClient;
/// <summary>
/// Initializes a new instance of the <see cref="AdminClientService" /> class.
/// </summary>
@ -22,17 +21,16 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
//Instance = GetInstance();
_lazyAdminClient = new Lazy<IAdminClient>(() => GetInstance());
Instance = GetInstance();
}
/// <summary>
/// Gets or sets the instance.
/// Gets or sets the instance.
/// </summary>
/// <value>
/// The instance.
/// </value>
public IAdminClient Instance => _lazyAdminClient.Value;
public IAdminClient Instance { get; set; }
/// <summary>
/// 创建Kafka主题
@ -65,7 +63,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
/// <summary>
/// 删除Kafka主题
/// 删除Kafka主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
@ -75,28 +73,28 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
/// <summary>
/// 获取Kafka主题列表
/// 获取Kafka主题列表
/// </summary>
/// <returns></returns>
public async Task<List<string>> ListTopicsAsync()
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return await Task.FromResult(new List<string>(metadata.Topics.Select(t => t.Topic)));
return new List<string>(metadata.Topics.Select(t => t.Topic));
}
/// <summary>
/// 判断Kafka主题是否存在
/// 判断Kafka主题是否存在
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task<bool> TopicExistsAsync(string topic)
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return await Task.FromResult(metadata.Topics.Any(t => t.Topic == topic));
return metadata.Topics.Any(t => t.Topic == topic);
}
/// <summary>
/// 检测分区是否存在
/// 检测分区是否存在
/// </summary>
/// <param name="topic"></param>
/// <param name="partitions"></param>
@ -112,7 +110,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
/// <summary>
/// 检测分区是否存在
/// 检测分区是否存在
/// </summary>
/// <param name="topic"></param>
/// <param name="targetPartition"></param>
@ -127,7 +125,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
/// <summary>
/// 获取主题的分区数量
/// 获取主题的分区数量
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
@ -139,8 +137,13 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
return metadata.Topics[0].Partitions.Count;
}
public void Dispose()
{
Instance?.Dispose();
}
/// <summary>
/// Gets the instance.
/// Gets the instance.
/// </summary>
/// <returns></returns>
public IAdminClient GetInstance()
@ -160,7 +163,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
/// <summary>
/// Checks the topic asynchronous.
/// Checks the topic asynchronous.
/// </summary>
/// <param name="topic">The topic.</param>
/// <returns></returns>
@ -171,7 +174,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
/// <summary>
/// 判断Kafka主题是否存在
/// 判断Kafka主题是否存在
/// </summary>
/// <param name="topic">主题名称</param>
/// <param name="numPartitions">副本数量不能高于Brokers数量</param>

View File

@ -1,13 +1,11 @@
using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System.Reflection;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
@ -22,14 +20,22 @@ namespace JiShe.CollectBus.Kafka
{
var configuration = context.Services.GetConfiguration();
//var kafkaSection = configuration.GetSection(CommonConst.Kafka);
//KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig();
//KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig ();
//kafkaSection.Bind(kafkaOptionConfig);
//Configure<KafkaOptionConfig>(kafkaSection);
//if (configuration[CommonConst.ServerTagName] != null)
//{
// kafkaOptionConfig.ServerTagName = configuration[CommonConst.ServerTagName]!;
//}
//context.Services.AddSingleton(kafkaOptionConfig);
//context.Services.Configure<KafkaOptionConfig>(context.Services.GetConfiguration().GetSection(CommonConst.Kafka));
Configure<KafkaOptionConfig>(options =>
{
configuration.GetSection(CommonConst.Kafka).Bind(options);
});
// 注册Producer
context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer
@ -38,12 +44,6 @@ namespace JiShe.CollectBus.Kafka
// 注册Polly
context.Services.AddSingleton<KafkaPollyPipeline>();
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
// 注册任务调度
context.Services.AddSingleton<KafkaTaskScheduler>();
//context.Services.AddHostedService<HostedService>();
}

View File

@ -12,7 +12,6 @@ using System.Collections.Concurrent;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using YamlDotNet.Core.Tokens;
namespace JiShe.CollectBus.Kafka.Consumer
{
@ -37,21 +36,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
private readonly KafkaTaskScheduler _kafkaTaskScheduler;
/// <summary>
/// ConsumerService
/// </summary>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions, KafkaTaskScheduler kafkaTaskScheduler)
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
_applicationOptions = applicationOptions.Value;
_kafkaPollyPipeline = kafkaPollyPipeline;
_kafkaTaskScheduler = kafkaTaskScheduler;
}
#region private
@ -135,26 +130,21 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
_= Task.Factory.StartNew(async () =>
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
@ -205,17 +195,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
}, cts.Token);
await Task.CompletedTask;
});
@ -241,28 +226,19 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
_ = Task.Factory.StartNew(async () =>
_ = Task.Run(async () =>
{
int count = 0;
while (!cts.IsCancellationRequested)
@ -312,17 +288,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
}, cts.Token);
await Task.CompletedTask;
});
}
@ -371,31 +342,22 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Factory.StartNew(async () =>
_ = Task.Run(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
@ -482,17 +444,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
}, cts.Token);
await Task.CompletedTask;
});
@ -544,30 +501,23 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Factory.StartNew(async () =>
_ = Task.Run(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
@ -652,17 +602,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
}, cts.Token);
await Task.CompletedTask;
});
@ -685,13 +630,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
try
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
if (_consumerStore.TryGetValue(consumerKey, out var entry))
if (_consumerStore.TryRemove(consumerKey, out var entry))
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
// 从字典中移除
_consumerStore.TryRemove(consumerKey, out entry);
}
}
catch (Exception ex)

View File

@ -49,11 +49,4 @@ public class KafkaOptionConfig
/// </summary>
public string? SaslPassword { get; set; }
/// <summary>
/// 订阅任务线程数量
/// 当主题未指定时,订阅任务线程数量默认为:-1
/// 优先级低于订阅任务特性TaskCount值
/// </summary>
public int TaskThreadCount { get; set; } = -1;
}

View File

@ -1,169 +0,0 @@
using System.Collections.Concurrent;
namespace JiShe.CollectBus.Kafka.Internal
{
public class KafkaTaskScheduler : TaskScheduler, IDisposable
{
private readonly BlockingCollection<Task> _tasksCollection=new BlockingCollection<Task> ();
private readonly List<Thread> _workerThreads;
private readonly object _disposeLock = new object();
private bool _isDisposed;
/// <summary>
/// 当前队列中的任务数
/// </summary>
public int QueuedTasks => _tasksCollection.Count;
/// <summary>
/// 当前工作线程数
/// </summary>
public int WorkerThreads => _workerThreads.Count;
/// <summary>
/// 初始化任务调度器
/// </summary>
public KafkaTaskScheduler()
{
// 默认最大并发线程数为CPU核心数
int MaxConcurrencyLevel = Environment.ProcessorCount;
_workerThreads = new List<Thread>(MaxConcurrencyLevel);
for (int i = 0; i < MaxConcurrencyLevel; i++)
{
var thread = new Thread(ExecuteScheduledTasks)
{
IsBackground = true,
Name = $"KafkaWorkerTask-{i + 1}"
};
thread.Start();
_workerThreads.Add(thread);
}
}
/// <summary>
/// 扩容工作线程调度
/// 可以启动多个工作线程来处理任务
/// </summary>
/// <param name="taskNum">扩展独立线程数(默认为1)</param>
public void WorkerThreadExpansion(int taskNum = 1)
{
int currCount = WorkerThreads+1;
Parallel.For(0, taskNum, (index) =>
{
var thread = new Thread(ExecuteScheduledTasks)
{
IsBackground = true,
Name = $"KafkaWorkerTask-{index+ currCount}"
};
thread.Start();
_workerThreads.Add(thread);
});
}
/// <summary>
/// 工作线程执行循环
/// </summary>
private void ExecuteScheduledTasks()
{
try
{
foreach (var task in _tasksCollection.GetConsumingEnumerable())
{
TryExecuteTaskSafely(task);
}
}
catch (OperationCanceledException) { }
catch (ObjectDisposedException) { }
}
/// <summary>
/// 安全执行任务并处理异常
/// </summary>
private void TryExecuteTaskSafely(Task task)
{
try
{
TryExecuteTask(task);
}
catch (OperationCanceledException){}
catch (Exception ex)
{
OnExceptionOccurred(ex);
}
}
#region TaskScheduler
protected override IEnumerable<Task> GetScheduledTasks()
{
ThrowIfDisposed();
return _tasksCollection.ToList();
}
protected override void QueueTask(Task task)
{
ThrowIfDisposed();
_tasksCollection.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 禁止内联执行以强制所有任务在专用线程执行
return false;
}
#endregion
#region
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
lock (_disposeLock)
{
if (_isDisposed) return;
if (disposing)
{
// 停止接收新任务
_tasksCollection.CompleteAdding();
// 等待所有工作线程退出
foreach (var thread in _workerThreads)
{
if (thread.IsAlive)
{
thread.Join(TimeSpan.FromSeconds(5));
}
}
// 释放资源
_tasksCollection.Dispose();
}
_isDisposed = true;
}
}
private void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}
#endregion
#region
/// <summary>
/// 任务执行异常时触发
/// </summary>
public event Action<Exception>? ExceptionEvent;
private void OnExceptionOccurred(Exception ex)
{
ExceptionEvent?.Invoke(ex);
}
#endregion
}
}

View File

@ -33,11 +33,11 @@ namespace JiShe.CollectBus.Kafka
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
/// <summary>
@ -69,7 +69,7 @@ namespace JiShe.CollectBus.Kafka
// 实现IKafkaSubscribe接口
var subscribeTypes = assembly.GetTypes().Where(type =>
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
!type.IsAbstract && !type.IsInterface).ToList();
!type.IsAbstract && !type.IsInterface).ToList();
if (subscribeTypes.Count == 0)
continue;
@ -77,7 +77,7 @@ namespace JiShe.CollectBus.Kafka
Parallel.ForEach(subscribeTypes, subscribeType =>
{
var subscribes = provider.GetServices(subscribeType).ToList();
Parallel.ForEach(subscribes, subscribe =>
Parallel.ForEach(subscribes,subscribe =>
{
if (subscribe != null)
{
@ -102,26 +102,7 @@ namespace JiShe.CollectBus.Kafka
//}
}
logger.LogWarning($"kafka订阅主题:{_topicSubscribeCount}数,共启动:{_threadCount}线程");
var kafkaTaskScheduler = provider.GetRequiredService<KafkaTaskScheduler>();
kafkaTaskScheduler.ExceptionEvent += (ex) =>
{
logger.LogError(ex, "Kafka任务调度异常");
};
//logger.LogWarning($"kafka订阅工作线程数{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
//
// 订阅调度监控测试可打开
//_ = Task.Factory.StartNew(async () =>
// {
// while (true)
// {
// logger.LogWarning($"kafka订阅工作线程数{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
// await Task.Delay(TimeSpan.FromSeconds(5));
// }
// });
});
}
/// <summary>
@ -142,35 +123,20 @@ namespace JiShe.CollectBus.Kafka
.ToList();
if (subscribeTypes.Count == 0) return;
Parallel.ForEach(subscribeTypes, subscribeType =>
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
Parallel.ForEach(subscribes, subscribe =>
subscribes.ForEach(subscribe =>
{
if (subscribe != null)
{
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
//threadCount += tuple.Item1;
//topicCount += tuple.Item2;
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
});
//foreach (var subscribeType in subscribeTypes)
//{
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(subscribe =>
// {
// if (subscribe != null)
// {
// Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
//}
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
@ -190,10 +156,13 @@ namespace JiShe.CollectBus.Kafka
Parallel.ForEach(subscribedMethods, sub =>
{
Interlocked.Increment(ref _topicSubscribeCount);
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int partitionCount = sub.Attribute!.TaskCount == -1 ? 3 : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
// 可以根据配置文件TaskThreadCount来配置线程数
int partitionCount = sub.Attribute!.TaskCount == -1 ? (kafkaOptionConfig.TaskThreadCount==-1? topicCount: kafkaOptionConfig.TaskThreadCount) : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
//int partitionCount = sub.Attribute!.TaskCount == -1 ? topicCount : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
//partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
@ -210,7 +179,7 @@ namespace JiShe.CollectBus.Kafka
//foreach (var sub in subscribedMethods)
//{
//// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
// var adminClientService = provider.GetRequiredService<IAdminClientService>();
// int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
@ -237,7 +206,7 @@ namespace JiShe.CollectBus.Kafka
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
{
var consumerService = provider.GetRequiredService<IConsumerService>();
if (attr.EnableBatch)
{
Interlocked.Increment(ref _threadStartCount);

View File

@ -38,19 +38,16 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
#region
/// <summary>
/// 读取计量数据CTR0_控制码(01H/09H)_DI1_DI0_SER
/// 读取计量数据
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public static Telemetry1882018PacketResponse CTR0_01_Send(Telemetry1882018PacketRequest request)
public static Telemetry1882018PacketResponse CTR_01_Send(Telemetry1882018PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');
var ctr = itemCodeArr[0];//CTR0
var c_data = itemCodeArr[1];//01
var DI1 = itemCodeArr[2];//91 或者 90
var DI0 = itemCodeArr[3];//1F
var SER = itemCodeArr[4];//00
var dataUnit = new List<string>() { DI1, DI0, SER };
var c_data = itemCodeArr[0];//01
var d_data = itemCodeArr[2];//91 或者 90
var dataUnit = new List<string>() { "1F", d_data, "00" };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry1882018PacketResponse() { Data = dataList };
@ -59,19 +56,16 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
#region
/// <summary>
/// 写数据CTR3_控制码(04H/0CH)_DI1_DI0_SER
/// 阀控
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public static Telemetry1882018PacketResponse CTR3_04_Send(Telemetry1882018PacketRequest request)
public static Telemetry1882018PacketResponse CTR_04_Send(Telemetry1882018PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');
var ctr = itemCodeArr[0];//CTR3
var c_data = itemCodeArr[1];//04
var DI1 = itemCodeArr[2];//A0
var DI0 = itemCodeArr[3];//17
var SER = itemCodeArr[4];//55 或者 99
var dataUnit = new List<string>() { DI1, DI0, SER };
var c_data = itemCodeArr[0];//01
var d_data = itemCodeArr[2];//55 或者 99
var dataUnit = new List<string>() { "A0", "17", "00", d_data };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry1882018PacketResponse() { Data = dataList };

View File

@ -31,7 +31,7 @@ namespace JiShe.CollectBus.Protocol.T1882018
T188ControlHandlers = Telemetry1882018PacketBuilder.T1882018ControlHandlers;
}
public sealed override ProtocolInfo Info => new(nameof(T1882018ProtocolPlugin), "376.1/188-2018", "TCP", "376.1/188-2018协议", "云集");
public sealed override ProtocolInfo Info => new(nameof(T1882018ProtocolPlugin), "376.1/188-2018", "TCP", "376.1/188-2018协议", "HJ-LXS-15 DN15");
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{
@ -93,9 +93,9 @@ namespace JiShe.CollectBus.Protocol.T1882018
//数据转发场景 10H_F1
if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{
var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
//var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
var t188PacketHandlerName = $"{subItemCodeArr[0]}_{subItemCodeArr[1]}_Send";
var t188PacketHandlerName = $"{T1882018PacketItemCodeConst.BasicT1882018}_{request.SubProtocolRequest.ItemCode}_Send";
Telemetry1882018PacketResponse t645PacketResponse = null;
if (T188ControlHandlers != null && T188ControlHandlers.TryGetValue(t188PacketHandlerName
@ -124,7 +124,6 @@ namespace JiShe.CollectBus.Protocol.T1882018
FocusAddress = request.FocusAddress,
Fn = fn,
Pn = request.Pn,
SubRequest = request.SubProtocolRequest,
DataUnit = dataUnit,
});
}

View File

@ -100,8 +100,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H
if(decimal.TryParse(data[4], out decimal value))
meter.DataValue = value;
}
meter.ItemType = T37612012PacketItemCodeConst.AFN10HFN97H;
meter.ItemType = "10_97";
meter.ValidData = data[2].Equals("91") || data[2].Equals("B1");
meter.FiledDesc = "电网频率";//"电网频率";
meter.FiledName = meter.ItemType.GetDataFieldByGatherDataType() ?? string.Empty;

View File

@ -1,5 +1,4 @@
using Confluent.Kafka;
using FreeRedis;
using FreeRedis;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt;
@ -30,7 +29,6 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
using YamlDotNet.Core.Tokens;
using static FreeSql.Internal.GlobalFilter;
using static IClientRPCService;
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
@ -174,8 +172,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
// 新建
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -193,9 +189,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
MSA = analysisBaseDto.MSA,
ItemCode = data.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
@ -213,31 +208,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(data.FiledName))
{
await _dbProvider.GetSessionPool(false).InsertAsync(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID);
}
return await Task.FromResult(true);
}
@ -255,9 +225,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
var data = analysisBaseDto.Data!;
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
List<TreeModelSingleMeasuringEntity<bool>> meterIsSyncs = new List<TreeModelSingleMeasuringEntity<bool>>();
List<TreeModelSingleMeasuringEntity<int>> meterDataBaseIDs = new List<TreeModelSingleMeasuringEntity<int>>();
foreach (var item in data)
{
if(!item.TimeSpan.HasValue)
@ -311,8 +278,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
// 新建
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -322,7 +287,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
DataType = IOTDBDataTypeConst.Log, // 匹配不到下发记录标记为LOG
Timestamps = DateTime.Now.CheckTimePoint().GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = item.DatabaseBusiID,
PendingCopyReadTime = item.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
PendingCopyReadTime = item.TimeSpan.Value.GetFormatTime(analysisBaseDto.DensityUnit, analysisBaseDto.TimeDensity),
CreationTime = currentTime,
FocusId = item.FocusId,
FocusAddress = analysisBaseDto.Code,
@ -330,9 +295,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
AFN = analysisBaseDto.AFN,
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
MSA = analysisBaseDto.MSA,
ItemCode = item.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
@ -350,35 +314,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0)
{
treeModelSingleMeasuringEntities.Add(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
if(!meterIsSyncs.Any(a=> a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps))
meterIsSyncs.Add(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID)
};
if (!meterDataBaseIDs.Any(a => a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps))
meterDataBaseIDs.Add(meterIsDatabaseBusiID);
}
}
// 批量保存数据
@ -386,11 +321,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (treeModelSingleMeasuringEntities.Count > 0)
{
await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
// 报存标识字段
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs);
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs);
}
return await Task.FromResult(true);
}
@ -435,7 +365,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -448,7 +378,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
DataType = IOTDBDataTypeConst.Status,
SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
}
@ -456,8 +386,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -476,8 +404,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
ItemCode = data.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
@ -536,7 +463,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -549,7 +476,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
@ -558,8 +485,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -578,8 +503,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
ItemCode = item.ItemType,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,

View File

@ -1,5 +1,4 @@
using System.Linq;
using System.Reflection;
using System.Reflection;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Extensions;
@ -101,7 +100,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
}
#endregion
#region C控制码_DI3
#region
/// <summary>
/// 变量数据标识编码处理
@ -110,7 +109,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
/// <returns></returns>
public static Telemetry6452007PacketResponse C11_02_Send(Telemetry6452007PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');//11_02_80_00_02控制码_DI3_DI2_DI1_DI0
var itemCodeArr = request.ItemCode.Split('_');//11_02_80_00_02
var c_data = itemCodeArr[0];
var DI3 = itemCodeArr[1];
var DI2 = itemCodeArr[2];
@ -119,8 +118,6 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
var dataUnit = new List<string>() { DI3, DI2, DI1, DI0 };
dataUnit.Reverse();
var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry6452007PacketResponse() { Data = dataList };
}

View File

@ -36,6 +36,38 @@ namespace JiShe.CollectBus.Protocol.T6452007
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{
//TODO:645解析报文
//TB3761? tB3761 = Analysis3761(messageReceived);
//if (tB3761 != null)
//{
// if (tB3761.AFN_FC?.AFN == (int)AFN.链路接口检测)
// {
// if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
// {
// _logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
// }
// else
// {
// if (tB3761.DT?.Fn == (int)FN.登录)
// {
// // 登录回复
// if (tB3761.SEQ.CON == (int)CON.需要对该帧进行确认)
// await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// else if (tB3761.DT?.Fn == (int)FN.心跳)
// {
// // 心跳回复
// //心跳帧有两种情况:
// //1. 集中器先有登录帧,再有心跳帧
// //2. 集中器没有登录帧,只有心跳帧
// await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// }
// }
// await OnTcpNormalReceived(client, tB3761);
//}
//return (tB3761 as T)!;
return null;
}

View File

@ -17,13 +17,15 @@ namespace JiShe.CollectBus.Application.Contracts
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">待缓存数据</param>
/// <returns></returns>
Task InsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel;
@ -45,13 +47,15 @@ namespace JiShe.CollectBus.Application.Contracts
/// 删除缓存信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">已缓存数据</param>
/// <returns></returns>
Task RemoveCacheDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel;
/// <summary>

View File

@ -94,7 +94,6 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
await dbContext.InitAmmeterCacheData("V4-Gather-8890");
await dbContext.InitWatermeterCacheData("V4-Gather-8890");
await dbContext.InitAmmeterCacheData();
}
}

View File

@ -17,7 +17,6 @@ using static FreeSql.Internal.GlobalFilter;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
using JiShe.CollectBus.IotSystems.Ammeters;
using System.IO.Pipelines;
namespace JiShe.CollectBus.RedisDataCache
{
@ -48,18 +47,21 @@ namespace JiShe.CollectBus.RedisDataCache
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">待缓存数据</param>
/// <returns></returns>
public async Task InsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey))
if (data == null || string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(InsertDataAsync)} 参数异常,-101");
return;
@ -68,27 +70,14 @@ namespace JiShe.CollectBus.RedisDataCache
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// Set索引缓存
trans.SAdd(redisSetIndexCacheKey, $"{data.TimeDensity.ToString().PadLeft(2, '0')}:{data.FocusAddress}");
// 主数据存储Hash
trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize());
//检查HSet是否存在对应的信息如果存在需要进一步检查value是否已经存在如果存在则更新不存在则添加
var oldValue = Instance.HGet<List<T>>(redisDeviceInfoHashCacheKey, data.FocusAddress);
if (oldValue == null || oldValue.Count <= 0)//直接添加
{
//设备信息缓存
trans.HSet(redisDeviceInfoHashCacheKey, data.FocusAddress, data);
}
else
{
// 移除缓存中同类型旧数据
oldValue.RemoveAll(device => device.MeterType == data.MeterType);
// 集中器号分组索引Set缓存
trans.SAdd(redisSetIndexCacheKey, data.MemberId);
//添加新数据
oldValue.Add(data);
//设备信息缓存
trans.HSet(redisDeviceInfoHashCacheKey, data.FocusAddress, oldValue);
}
// 集中器与表计信息排序索引ZSET缓存Key
trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId);
var results = trans.Exec();
@ -136,28 +125,13 @@ namespace JiShe.CollectBus.RedisDataCache
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
{
{
// Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, $"{item.Value.First().TimeDensity.ToString().PadLeft(2, '0')}:{item.Value.First().FocusAddress}");
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value.Serialize());
//检查HSet是否存在对应的信息如果存在需要进一步检查value是否已经存在如果存在则更新不存在则添加
var oldValue = Instance.HGet<List<T>>(redisDeviceInfoHashCacheKey, item.Key);
if (oldValue == null || oldValue.Count <= 0)//直接添加
{
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value);
}
else
{
// 移除缓存中同类型旧数据
oldValue.RemoveAll(device => device.MeterType == item.Value[0].MeterType);
//添加新数据
oldValue.AddRange(item.Value);
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, oldValue);
}
}
pipe.EndPipe();
}
@ -172,18 +146,21 @@ namespace JiShe.CollectBus.RedisDataCache
/// 删除缓存信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">已缓存数据</param>
/// <returns></returns>
public async Task RemoveCacheDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel
{
if (data == null
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey) )
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 参数异常,-101");
return;
@ -191,30 +168,32 @@ namespace JiShe.CollectBus.RedisDataCache
const string luaScript = @"
local hashCacheKey = KEYS[1]
local setIndexCacheKey = KEYS[2]
local focusAddress = ARGV[1]
local scoreValue = ARGV[2]
local setIndexCacheKey = KEYS[2]
local zsetScoresIndexCacheKey = KEYS[3]
local member = ARGV[1]
local deleted = 0
if redis.call('HDEL', hashCacheKey, focusAddress) > 0 then
if redis.call('HDEL', hashCacheKey, member) > 0 then
deleted = 1
end
redis.call('SREM', setIndexCacheKey, scoreValue)
redis.call('SREM', setIndexCacheKey, member)
redis.call('ZREM', zsetScoresIndexCacheKey, member)
return deleted
";
var keys = new[]
{
redisDeviceInfoHashCacheKey,
redisSetIndexCacheKey
redisHashCacheKey,
redisSetIndexCacheKey,
redisZSetScoresIndexCacheKey
};
var result = await Instance.EvalAsync(luaScript, keys, new object[] { data.FocusAddress , data.ScoreValue});
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId });
if ((int)result == 0)
{
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisDeviceInfoHashCacheKey}的{data.MemberId}数据失败,-102");
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102");
}
}

View File

@ -30,7 +30,6 @@ using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Sockets;
@ -174,7 +173,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
};
await _iotDBProvider.InsertAsync(meter2);
ElectricityMeter meter3 = new ElectricityMeter()
{
@ -208,7 +207,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
time = DateTime.Now;
//System.Reflection.PropertyInfo;
//System.Reflection.FieldInfo
//TreeModelSingleMeasuringEntityAccessor
//TreeModelSingleMeasuringEntityAccessor
var meter = new TreeModelSingleMeasuringEntity<decimal?>()
{
SystemName = "energy",
@ -586,35 +585,5 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await Task.CompletedTask;
}
/// <summary>
/// IoTDB空表查询情况
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestIoTDBEmptyTableQuery()
{
var meter = new MeterReadingTelemetryPacketInfo() { DevicePath = "MeterReadingTelemetryPacketInfo", DeviceId = "1111" };
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
Operator = "=",
Value = meter.DeviceId
};
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = meter.DevicePath,
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
await _iotDBProvider.GetSessionPool(true).InitTableSessionModelAsync();
var pageResult = await _iotDBProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(query);
await Task.CompletedTask;
}
}

View File

@ -8,7 +8,6 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
@ -29,9 +28,6 @@ using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Guids;
using static FreeSql.Internal.GlobalFilter;
using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata;
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -133,103 +129,187 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
if (timeDensity > 15)
{
timeDensity = 15;
}
//电表定时广播校时,一天一次。
string currentTimeStr = $"{currentTime:HH:mm:00}";
if (string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))//自动校时
{
//_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
//return;
//if (string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))//自动校时
//{
// //_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
// //return;
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else
//{
// _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他");
//}
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器终端版本信息 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器SIM卡读取 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticMonthFreezeTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表月冻结 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表日冻结 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他");
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
var currentTaskTime = tasksToBeIssueModel.LastTaskTime.CalculateNextCollectionTime(timeDensity);//程序启动缓存电表的时候NextTaskTime需要格式化到下一个采集点时间。
if (!IsTaskTime(currentTaskTime, timeDensity))//todo 如果时间超过两个采集频率周期,就一直处理,直到追加到下一个采集频率周期。
@ -239,15 +319,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
//tasksToBeIssueModel.NextTaskTime;
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
//电表最大采集频率为15分钟
if (timeDensity > 15)
{
timeDensity = 15;
}
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
@ -257,7 +333,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask));
@ -334,8 +410,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// 创建取消令牌源
//var cts = new CancellationTokenSource();
await _dbProvider.GetSessionPool(true).InitTableSessionModelAsync();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
// //此处代码不要删除
@ -423,10 +497,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
deviceIds.Add(ammeter.MeterId.ToString());
//处理ItemCode
if (ammeter.ItemCodes == null && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
ammeter.ItemCodes = new List<string>();
var itemArr = ammeter.DataTypes.Split(',').ToList();
#region
@ -439,7 +511,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
if (!excludeItemCode.Contains(gatherItem.ItemCode))
{
itemCodeList.Add(gatherItem.ItemCode.Replace("WAVE_109", "10_109"));
itemCodeList.Add(gatherItem.ItemCode);
}
}
@ -458,11 +530,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ammeter.ItemCodes = itemCodeList;
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
}
//var tempItemCodeList = new List<string>() { "10_97" };
//ammeter.ItemCodes = tempItemCodeList.Serialize();
ammeter.ItemCodes = "10_97";
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{
@ -630,7 +706,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return null;
}
if (ammeterInfo.ItemCodes == null || ammeterInfo.ItemCodes.Count <=0)
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null;
@ -677,7 +753,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return null;
}
List<string> tempCodes = ammeterInfo.ItemCodes!;
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
@ -729,14 +805,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(tempItem);
//TODO:特殊表
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = itemCodeInfo.Item1 == T37612012PacketItemCodeConst.AFN10HFN01H ? 0 : ammeterInfo.MeteringCode,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCodeInfo.Item1,
DataTimeMark = new Protocol.DataTimeMark()
{
@ -763,8 +838,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ammeterInfo: ammeterInfo,
timestamps: DateTimeOffset.Now.ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCodeInfo.Item1,
subItemCode: itemCodeInfo.Item2,
itemCode: tempItem,
subItemCode: null,
pendingCopyReadTime: timestamps,
creationTime: currentTime,
packetType: (TelemetryPacketTypeEnum)timeDensity,
@ -809,16 +884,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try
{
#if DEBUG
#else
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -832,7 +903,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
@ -890,16 +961,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try
{
#if DEBUG
#else
//判断是否是日冻结抄读时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledGetAutomaticDayFreezeData)} 非电表日冻结抄读时间,暂不处理");
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -907,7 +974,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表日冻结抄读运行时间{currentTime}没有找到对应的协议组件,-105");
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
@ -917,13 +984,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
DataTime = currentTime.AddDays(-1),//日冻结抄读时间为昨天
},
ItemCode = item
});
var meterReadingRecords = CreateAmmeterPacketInfo(
@ -968,29 +1029,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
#if DEBUG
#else
//需要检查是不是每月1号抄读上个月的数据
if (currentTime.Date != currentTime.FirstDayOfMonth().Date)
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledGetAutomaticMonthFreezeData)} 非月冻结数据抄读时间,暂不处理");
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null;
}
else
{
timestamps = currentTime.LastDayOfPrdviousMonth();
}
//判断是否是月冻结数据抄读
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticMonthFreezeTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 非电表月冻结抄读时间,暂不处理");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -998,27 +1045,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表月冻结抄读时间{currentTime}没有找到对应的协议组件,-105");
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
foreach (var item in MonthFreezeCodes)
foreach (var item in DayFreezeCodes)
{
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
#if DEBUG
DataTime = currentTime.AddMonths(-1),//月冻结抄读时间为上个月
#else
DataTime = timestamps,//月冻结抄读时间为上个月
#endif
},
ItemCode = item
});
var meterReadingRecords = CreateAmmeterPacketInfo(
@ -1118,7 +1155,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
#endregion
#endregion
#region
@ -1153,42 +1190,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
if (meterInfos != null && meterInfos.Count > 0)
{
foreach (var item in meterInfos)
{
if (item.MeterTypeName.Equals("水表") && (item.Protocol.Equals((int)MeterLinkProtocol.CJT_188_2018) || item.Protocol.Equals((int)MeterLinkProtocol.DLT_645_1997) || item.Protocol.Equals((int)MeterLinkProtocol.DLT_645_2007)))
{
if (item.BrandType.Contains("炬华有线"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN0CHFN188H };
}
else
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN0CHFN129H };
}
}
else if (item.MeterTypeName.Trim().Equals("西恩超声波流量计"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
else if (item.MeterTypeName.Trim().Equals("江苏华海涡街流量计积算仪"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
else if (item.MeterTypeName.Trim().Equals("V880BR涡街流量计"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
else if (item.MeterTypeName.Trim().Equals("拓思特涡街流量计H880BR"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
}
}
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
@ -1316,14 +1317,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
timeDensity = watermeter.TimeDensity;//水表默认为60分钟
typeName = watermeter.LinkType;
if (watermeter.BrandType.Contains("泉高阀门") || watermeter.BrandType.Equals("LXSY-山水翔"))
if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔"))
{
typeName = watermeter.BrandType;
typeName = watermeter.MeterBrand;
}
}
else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
{
typeName = watermeter.BrandType;
typeName = watermeter.MeterBrand;
}
else
{
@ -1339,65 +1340,68 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 创建水表待发送的任务数据时{currentTime}没有找到对应的协议组件,-101");
return null;
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
}
if (watermeter.ItemCodes == null || watermeter.ItemCodes.Count <=0)
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = T1882018PacketItemCodeConst.CTR0190;
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 创建水表待发送的任务数据时{watermeter.Name}没有相应的采集项,-102");
FocusAddress = watermeter.FocusAddress,
Pn = watermeter.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = watermeter.MeterAddress,
Password = watermeter.Password,
ItemCode = subItemCode,
}
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
return null;
}
foreach (var item in watermeter.ItemCodes)
{
var tempRequest = new ProtocolBuildRequest()
{
FocusAddress = watermeter.FocusAddress,
Pn = watermeter.MeteringCode,
ItemCode = item,
};
if (item == T37612012PacketItemCodeConst.AFN09HFN01H)
{
//var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo188SubCodeRelationship(T37612012PacketItemCodeConst.AFN10HFN99H, true);//阀控
tempRequest.SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = watermeter.MeterAddress,
Password = watermeter.Password,
ItemCode = T1882018PacketItemCodeConst.CTR01901F00,
MeteringPort = watermeter.MeteringPort,
Baudrate = watermeter.Baudrate,
};
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(tempRequest);
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
return null;
}
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name} 水表采抄读采集项{T1882018PacketItemCodeConst.CTR01901F00}未能正确获取报文。");
return null;
}
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: watermeter,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: T37612012PacketItemCodeConst.AFN10HFN01H,
subItemCode: T1882018PacketItemCodeConst.CTR01901F00,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.WatermeterAutoReadding,
_guidGenerator);
taskList.Add(meterReadingRecords);
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{itemCode}未能正确获取报文。");
return null;
}
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{watermeter.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
PacketType = (int)TelemetryPacketTypeEnum.WatermeterAutoReadding,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = watermeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
return taskList;
@ -1500,15 +1504,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try
{
#if DEBUG
#else
//判断是否是自动获取版本号时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -1556,7 +1557,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw;
}
}
#endregion
#endregion
#region
@ -1775,7 +1776,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = guidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
FocusDensity = ammeterInfo.TimeDensity.GetFocusDensity(),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};

View File

@ -96,144 +96,125 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"ammeter/list")]
public override async Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
#if DEBUG
var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus109:DeviceInfo";
//#if DEBUG
// var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus109:DeviceInfo";
List<DeviceInfo> ammeterInfos = FreeRedisProvider.Instance.Get<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);//542400504
// List<DeviceInfo> ammeterInfos = FreeRedisProvider.Instance.Get<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);//542400504
if (ammeterInfos == null || ammeterInfos.Count <= 0)
{
ammeterInfos = new List<DeviceInfo>();
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "442400040",
// Name = "保利单箱电表1",
// FocusId = 95780,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "442405000040",
// MeterId = 127035,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
// BrandType = "DTS1980",
// MeterType = MeterTypeEnum.Ammeter,
// ProjectID = 1,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
// Password = "000000",
//});
// if (ammeterInfos == null || ammeterInfos.Count <= 0)
// {
// ammeterInfos = new List<DeviceInfo>();
// //ammeterInfos.Add(new DeviceInfo()
// //{
// // Baudrate = 2400,
// // FocusAddress = "442400040",
// // Name = "保利单箱电表1",
// // FocusId = 95780,
// // DatabaseBusiID = 1,
// // MeteringCode = 0,
// // MeterAddress = "442405000040",
// // MeterId = 127035,
// // TypeName = 1,
// // DataTypes = "581,589,592,597,601",
// // TimeDensity = 15,
// // BrandType = "DTS1980",
// // MeterType = MeterTypeEnum.Ammeter,
// // ProjectID = 1,
// // MeteringPort = MeteringPortConst.MeteringPortTwo,
// // Password = "000000",
// //});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "442400039",
// Name = "保利单箱电表2",
// FocusId = 69280,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "442405000039",
// MeterId = 95594,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
// BrandType = "DTS1980",
// MeterType = MeterTypeEnum.Ammeter,
// ProjectID = 1,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
// Password = "000000",
//});
// //ammeterInfos.Add(new DeviceInfo()
// //{
// // Baudrate = 2400,
// // FocusAddress = "442400039",
// // Name = "保利单箱电表2",
// // FocusId = 69280,
// // DatabaseBusiID = 1,
// // MeteringCode = 0,
// // MeterAddress = "442405000039",
// // MeterId = 95594,
// // TypeName = 1,
// // DataTypes = "581,589,592,597,601",
// // TimeDensity = 15,
// // BrandType = "DTS1980",
// // MeterType = MeterTypeEnum.Ammeter,
// // ProjectID = 1,
// // MeteringPort = MeteringPortConst.MeteringPortTwo,
// // Password = "000000",
// //});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "402440506",
// Name = "中环半导体9#冷却泵-220KW(三相电表)",
// FocusId = 106857,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "402410040506",
// MeterId = 139059,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DTS1980",
// Password = "000000",
// ProjectID = 1,
// MeterType = MeterTypeEnum.Ammeter,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
//});
// //ammeterInfos.Add(new DeviceInfo()
// //{
// // Baudrate = 2400,
// // FocusAddress = "402440506",
// // Name = "中环半导体9#冷却泵-220KW(三相电表)",
// // FocusId = 106857,
// // DatabaseBusiID = 1,
// // MeteringCode = 0,
// // MeterAddress = "402410040506",
// // MeterId = 139059,
// // TypeName = 3,
// // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// // TimeDensity = 15,
// // BrandType = "DTS1980",
// // Password = "000000",
// // ProjectID = 1,
// // MeterType = MeterTypeEnum.Ammeter,
// // MeteringPort = MeteringPortConst.MeteringPortTwo,
// //});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "942411321",
// Name = "DDS1980-T4(5-60) ML307A 长稳 942408011321",
// FocusId = 57682,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "942408011321",
// MeterId = 78970,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DTS1980",
// Password = "000000",
// ProjectID = 1,
// MeterType = MeterTypeEnum.Ammeter,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
//});
// ammeterInfos.Add(new DeviceInfo()
// {
// Baudrate = 2400,
// FocusAddress = "942411321",
// Name = "DDS1980-T4(5-60) ML307A 长稳 942408011321",
// FocusId = 57682,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "942408011321",
// MeterId = 78970,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DTS1980",
// Password = "000000",
// ProjectID = 1,
// MeterType = MeterTypeEnum.Ammeter,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
// });
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "942411319",
// Name = "DDS1980-T4(5-60) ML307A 长稳 942408011319",
// FocusId = 57685,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "942408011319",
// MeterId = 78973,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DTS1980",
// Password = "000000",
// ProjectID = 1,
// MeterType = MeterTypeEnum.Ammeter,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
//});
ammeterInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "322011149",
Name = "DDS1980-T4(5-60) ML307A 长稳 322011149",
FocusId = 57685,
DatabaseBusiID = 1,
MeteringCode = 0,
MeterAddress = "31240010270",
MeterId = 78973,
TypeName = 3,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
BrandType = "DTS1980",
Password = "000000",
ProjectID = 1,
MeterType = MeterTypeEnum.Ammeter,
MeteringPort = MeteringPortConst.MeteringPortTwo,
});
FreeRedisProvider.Instance.Set(redisCacheDeviceInfoHashKeyTemp, ammeterInfos);
}
return ammeterInfos;
#else
// ammeterInfos.Add(new DeviceInfo()
// {
// Baudrate = 2400,
// FocusAddress = "942411319",
// Name = "DDS1980-T4(5-60) ML307A 长稳 942408011319",
// FocusId = 57685,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "942408011319",
// MeterId = 78973,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DTS1980",
// Password = "000000",
// ProjectID = 1,
// MeterType = MeterTypeEnum.Ammeter,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
// });
#endif
// FreeRedisProvider.Instance.Set(redisCacheDeviceInfoHashKeyTemp, ammeterInfos);
// }
// return ammeterInfos;
//#else
//#endif
try
@ -274,8 +255,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//TODO 记得移除特殊表过滤
#if DEBUG
// sql = $@"{sql} and c.Address in('542410000504','442405000040','442405000039','402410040506')";
#if DEBUG
//// sql = $@"{sql} and c.Address in('542410000504','442405000040','442405000039','402410040506')";
sql = $@"{sql} and c.Address in('402410040506')";
#endif
@ -527,7 +508,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = 0,//ammeterInfo.MeteringCode,现有协议里面阀控必须传0不能根据档案的MeteringCode值走。
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
@ -579,41 +560,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
try
{
#if DEBUG
var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus119:DeviceInfo";
List<DeviceInfo> deviceInfos = FreeRedisProvider.Instance.Get<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);
if (deviceInfos == null || deviceInfos.Count <= 0)
{
deviceInfos = new List<DeviceInfo>();
deviceInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "322011149",
Name = "LXSY-25E 保利",
FocusId = 57675,
DatabaseBusiID = 1,
MeteringCode = 0,
MeterAddress = "341587000473",
MeterId = 1025,
TypeName = 1,
TimeDensity = 60,
BrandType = "云集",
MeterType = MeterTypeEnum.WaterMeter,
ProjectID = 1,
MeteringPort = MeteringPortConst.MeteringPortTwo,
Password = "000000",
LinkType = "RS-485",
TimesRate = 1.0000m,
});
FreeRedisProvider.Instance.Set(redisCacheDeviceInfoHashKeyTemp, deviceInfos);
}
return deviceInfos;
#else
string sql = $@"SELECT
string sql = $@"SELECT
A.ID as MeterId,
A.Name,
A.FocusID as FocusId,
@ -628,7 +575,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
A.LinkType,
A.HaveValve,
A.MeterType AS MeterTypeName,
A.MeterBrand AS BrandType,
A.MeterBrand,
A.TimesRate,
A.TimeDensity,
A.TripState,
@ -655,7 +602,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<DeviceInfo>(sql);
#endif
}
catch (Exception)
{

View File

@ -46,7 +46,14 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// 设备类型: 水表\气表、流量计
/// </summary>
public string MeterTypeName { get; set; }
/// <summary>
/// 设备品牌;
/// (当 MeterType = 水表, 如 威铭、捷先 等)
/// (当 MeterType = 流量计, 如 西恩超声波流量计、西恩电磁流量计、涡街流量计 等)
/// </summary>
public string MeterBrand { get; set; }
/// <summary>
/// 倍率
/// </summary>
@ -68,7 +75,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
public string AreaCode { get; set; }
/// <summary>
/// 仅当MeterType为电表时电表类别 1单相、2三相三线、3三相四线,
/// 电表类别 1单相、2三相三线、3三相四线,
/// 07协议 开合闸指令(1A开闸断电,1C单相表合闸,1B多相表合闸) 645 2007 表
/// 97协议//true(合闸);false(跳闸) 545 1997 没有单相多相 之分 "true" ? "9966" : "3355"
/// </summary>
@ -113,8 +120,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// <summary>
/// 该电表方案下采集项JSON格式["0D_80","0D_80"]
/// </summary>
[Column(IsIgnore = true)]
public List<string> ItemCodes { get; set; }
public string ItemCodes { get; set; }
/// <summary>
/// State表状态:

View File

@ -13,8 +13,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// 设备表型数据信息
/// </summary>
[SourceAnalyzers(EntityTypeEnum.TableModel)]
[IgnoreInitTable]
public class DeviceTableModelDataInfo : IoTEntity
public class DeviceTreeModelDataInfo: IoTEntity
{
[FIELDColumn]

View File

@ -12,8 +12,8 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// <summary>
/// 设备树模型数据信息
/// </summary>
[SourceAnalyzers(EntityTypeEnum.TreeModel)]
public class DeviceTreeModelDataInfo : IoTEntity
[SourceAnalyzers(EntityTypeEnum.TableModel)]
public class DeviceTableModelDataInfo : IoTEntity
{
[FIELDColumn]

View File

@ -151,12 +151,6 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
[FIELDColumn]
public string IssuedMessageId { get; set; }
/// <summary>
/// 集中器采集密度
/// </summary>
[FIELDColumn]
public int FocusDensity { get; set; }
/// <summary>
/// 消息上报内容
/// </summary>

View File

@ -14,14 +14,9 @@ namespace JiShe.CollectBus.Common.Consts
#region
/// <summary>
/// 基路径,表示主站发起读数据
/// 基路径
/// </summary>
public const string BasicT1882018Read = "CTR0";
/// <summary>
/// 基路径,表示主站发起读数据
/// </summary>
public const string BasicT1882018Write = "CTR3";
public const string BasicT1882018 = "CTR";
#region
@ -29,12 +24,12 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 读取计量数据1
/// </summary>
public const string CTR01901F00 = $"{BasicT1882018Read}_01_90_1F_00";
public const string CTR0190 = $"01_90";
/// <summary>
/// 读取计量数据2
/// </summary>
public const string CTR01911F00 = $"{BasicT1882018Read}_01_91_1F_00";
public const string CTR0191 = $"01_91";
#endregion
@ -43,12 +38,12 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 关阀
/// </summary>
public const string CTR304A01755 = $"{BasicT1882018Write}_04_A0_17_55";
public const string CTR30455 = $"_04_55";
/// <summary>
/// 开阀
/// </summary>
public const string CTR304A01799 = $"{BasicT1882018Write}_04_A0_17_99";
public const string CTR30499 = $"_04_99";
#endregion

View File

@ -17,7 +17,7 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 透明转发
/// </summary>
public const string AFN10HFN01H = $"10_1";
public const string AFN10HFN01H = $"10_01";
/// <summary>
@ -28,27 +28,12 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 读取终端信息
/// </summary>
public const string AFN09HFN01H = $"09_1";
public const string AFN09HFN01H = $"09_01";
/// <summary>
/// 远程通信模块版本信息
/// </summary>
public const string AFN09HFN09H = $"09_9";
/// <summary>
/// 水表阀控
/// </summary>
public const string AFN10HFN99H = $"10_99";
/// <summary>
/// 炬华有线水表抄读
/// </summary>
public const string AFN0CHFN188H = $"0C_188";
/// <summary>
/// 标准188协议水表抄读
/// </summary>
public const string AFN0CHFN129H = $"0C_129";
public const string AFN09HFN09H = $"09_09";
#endregion
@ -185,9 +170,9 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// IotDB存储字段字段
/// 集中器状态字段
/// </summary>
public class IotDbFieldConst
public class ConcentratorStatusFieldConst
{
/// <summary>
@ -200,45 +185,20 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary>
public const string FrameData = "FrameData";
/// <summary>
/// 是否同步
/// </summary>
public const string IsSync = "IsSync";
/// <summary>
/// 数据库业务ID
/// </summary>
public const string DatabaseBusiID= "DatabaseBusiID";
}
#endregion
/// <summary>
/// 特殊645编码关系映射
/// </summary>
/// <param name="itemCode">特殊3761编码</param>
/// <param name="itemCode"></param>
/// <returns></returns>
public static (string,string) MappingItemCodeTo645SubCodeRelationship(string itemCode)
{
return itemCode switch
{
AFN10HFN97H => (AFN10HFN01H,T6452007PacketItemCodeConst.C1102800002),
_ => (itemCode,""),
};
}
/// <summary>
/// 特殊188编码关系映射
/// </summary>
/// <param name="itemCode">特殊3761编码</param>
/// <param name="tripState">TripState 0 合闸-开阀, 1 关阀);开阀关阀</param>
/// <returns></returns>
public static (string, string) MappingItemCodeTo188SubCodeRelationship(string itemCode,bool tripState)
{
return itemCode switch
{
AFN10HFN99H => (AFN10HFN01H, tripState == true ?T1882018PacketItemCodeConst.CTR304A01799: T1882018PacketItemCodeConst.CTR304A01755),
_ => (itemCode, ""),
_=> (itemCode,""),
};
}
}

View File

@ -265,79 +265,5 @@ namespace JiShe.CollectBus.Common.Extensions
}
return DateTime.TryParseExact(dateLong.ToString(), "yyyyMMdd HHmmssZZ", null, System.Globalization.DateTimeStyles.None, out DateTime date) ? date : throw new ArgumentException("Date must be between 10000101 and 99991231.");
}
/// <summary>
/// 取得某月的第一天
/// </summary>
/// <param name="datetime">要取得月份第一天的时间</param>
/// <returns></returns>
public static DateTime FirstDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day);
}
///<summary>
/// 取得某月的最后一天
/// </summary>
/// <param name="datetime">要取得月份最后一天的时间</param>
/// <returns></returns>
public static DateTime LastDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddMonths(1).AddDays(-1);
}
/// <summary>
/// 取得上个月第一天
/// </summary>
/// <param name="datetime">要取得上个月第一天的当前时间</param>
/// <returns></returns>
public static DateTime FirstDayOfPreviousMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddMonths(-1);
}
/// <summary>
/// 取得上个月的最后一天
/// </summary>
/// <param name="datetime">要取得上个月最后一天的当前时间</param>
/// <returns></returns>
public static DateTime LastDayOfPrdviousMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddDays(-1);
}
/// <summary>
/// 取得某月第一天0点以及最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetMonthDateRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.FirstDayOfMonth(), new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到当月最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateToLastDayRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateRange(this DateTime datetime)
{
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(datetime.Year, datetime.Month, datetime.Day, 23, 59, 59));
}
}
}

View File

@ -136,7 +136,57 @@ namespace JiShe.CollectBus.Common.Helpers
return objModel;
}
/// <summary>
/// 取得某月的第一天
/// </summary>
/// <param name="datetime">要取得月份第一天的时间</param>
/// <returns></returns>
public static DateTime FirstDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day);
}
///<summary>
/// 取得某月的最后一天
/// </summary>
/// <param name="datetime">要取得月份最后一天的时间</param>
/// <returns></returns>
public static DateTime LastDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddMonths(1).AddDays(-1);
}
/// <summary>
/// 取得某月第一天0点以及最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetMonthDateRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.FirstDayOfMonth(), new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到当月最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateToLastDayRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateRange(this DateTime datetime)
{
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(datetime.Year, datetime.Month, datetime.Day, 23, 59, 59));
}
/// <summary>
/// 获取指定枚举的所有 Attribute 说明以及value组成的键值对
@ -865,72 +915,5 @@ namespace JiShe.CollectBus.Common.Helpers
return Convert.ToInt64(scoresStr);
}
/// <summary>
/// 加载指定名称的程序集
/// </summary>
/// <param name="assemblyNames"></param>
/// <returns></returns>
public static List<Assembly> LoadAssemblies(string[] assemblyNames)
{
var assemblies = new List<Assembly>();
// 获取已加载的程序集
foreach (var asm in AppDomain.CurrentDomain.GetAssemblies())
{
if (assemblyNames.Contains(asm.GetName().Name))
assemblies.Add(asm);
}
// 尝试加载未加载的程序集
foreach (var name in assemblyNames)
{
if (!assemblies.Any(a => a.GetName().Name == name))
{
try
{
var assembly = Assembly.Load(name);
assemblies.Add(assembly);
}
catch (FileNotFoundException)
{
// 若Load失败尝试从基目录加载
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"{name}.dll");
if (File.Exists(path))
{
try
{
assemblies.Add(Assembly.LoadFrom(path));
}
catch { /* 记录错误 */ }
}
}
}
}
return assemblies;
}
/// <summary>
/// 创建类型实例
/// </summary>
/// <param name="types"></param>
/// <returns></returns>
public static List<object> CreateInstances(List<Type> types)
{
var instances = new List<object>();
foreach (var type in types)
{
try
{
instances.Add(Activator.CreateInstance(type));
}
catch (Exception)
{
throw;
}
}
return instances;
}
}
}

View File

@ -76,8 +76,8 @@ namespace JiShe.CollectBus.Host
app.UseCors(CollectBusHostConst.DefaultCorsPolicyName);
app.UseAuthentication();
app.UseAuthorization();
//if (env.IsDevelopment())
//{
if (env.IsDevelopment())
{
app.UseSwagger();
app.UseAbpSwaggerUI(options =>
{
@ -88,7 +88,7 @@ namespace JiShe.CollectBus.Host
options.DocExpansion(DocExpansion.None);
options.DefaultModelsExpandDepth(-1);
});
//}
}
app.UseAuditing();
app.UseAbpSerilogEnrichers();
app.UseUnitOfWork();

View File

@ -19,10 +19,6 @@
<Content Remove="C:\Users\Dai Zan\.nuget\packages\volo.abp.aspnetcore\8.3.3\contentFiles\any\net8.0\Volo.Abp.AspNetCore.abppkg.analyze.json" />
</ItemGroup>
<ItemGroup>
<None Remove="Plugins\ignore.txt" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
@ -56,10 +52,6 @@
<PackageReference Include="Hangfire.Dashboard.BasicAuthorization" Version="1.0.2" />-->
</ItemGroup>
<ItemGroup>
<Page Include="Plugins\ignore.txt" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
@ -83,6 +75,9 @@
<None Update="Plugins\JiShe.CollectBus.Protocol.T6452007.dll">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Plugins\JiShe.CollectBus.Protocol.Test.dll">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

View File

@ -1,8 +1,8 @@
{
"ConnectionStrings": {
"Default": "mongodb://admin:4mFmPTTB8tn6aI@47.110.62.104:27017,47.110.53.196:27017,47.110.60.222:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"PrepayDB": "server=rm-m5el3d1u1k0wzk70n2o.sqlserver.rds.aliyuncs.com,3433;database=jishe.sysdb;uid=v3sa;pwd=JiShe123;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=rm-wz9hw529i3j1e3b5fbo.sqlserver.rds.aliyuncs.com,3433;database=db_energy;uid=yjdb;pwd=Kdjdhf+9*7ad222LL;Encrypt=False;Trust Server Certificate=False"
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
"Configuration": "47.110.60.222:6379,password=3JBGfyhTaD46nS,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
@ -18,16 +18,13 @@
"SaslPassword": "lixiao@1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
"TaskThreadCount": -1,
"ServerTagName": "JiSheCollectBus100",
"FirstCollectionTime": "2025-04-22 16:07:00"
},
"IoTDBOptions": {
"UserName": "root",
"Password": "root",
//"ClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"Password": "root",
"ClusterList": [ "121.42.175.177:16667" ],
"Password": "Yp2eU6MVdIjXCL",
"ClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"PoolSize": 2,
"DataBaseName": "energy",
"OpenDebugMode": true,

View File

@ -1,4 +1,39 @@
{
"Serilog": {
"Using": [
"Serilog.Sinks.Console",
"Serilog.Sinks.File"
],
"MinimumLevel": {
"Default": "Warning",
"Override": {
"Microsoft": "Warning",
"Volo.Abp": "Warning",
"Hangfire": "Warning",
"DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning",
"Microsoft.AspNetCore": "Warning",
"Microsoft.AspNetCore.Diagnostics.HealthChecks": "Warning"
}
},
"WriteTo": [
{
"Name": "Console"
},
{
"Name": "File",
"Args": {
"path": "logs/logs-.txt",
"rollingInterval": "Day"
}
}
]
},
"App": {
"SelfUrl": "http://localhost:44315",
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"ConnectionStrings": {
"Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.5.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.5.9:29092,192.168.5.9:39092,192.168.5.9:49092",
@ -11,21 +46,46 @@
"DefaultDB": "14",
"HangfireDB": "13"
},
"Jwt": {
"Audience": "JiShe.CollectBus",
"SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=",
"Issuer": "JiShe.CollectBus",
"ExpirationTime": 2
},
"HealthChecks": {
"IsEnable": true,
"HealthCheckDatabaseName": "HealthChecks",
"EvaluationTimeInSeconds": 10,
"MinimumSecondsBetweenFailureNotifications": 60
},
"SwaggerConfig": [
{
"GroupName": "Basic",
"Title": "【后台管理】基础模块",
"Version": "V1"
},
{
"GroupName": "Business",
"Title": "【后台管理】业务模块",
"Version": "V1"
}
],
"Kafka": {
"BootstrapServers": "192.168.5.9:29092,192.168.5.9:39092,192.168.5.9:49092",
"EnableFilter": true,
"EnableAuthorization": false,
"SecurityProtocol": "SaslPlaintext",
"SaslMechanism": "Plain",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
"TaskThreadCount": -1,
"FirstCollectionTime": "2025-04-22 16:07:00"
},
"IoTDBOptions": {
"UserName": "root",
"Password": "root",
"ClusterList": [ "121.42.175.177:16667" ],
"ClusterList": [ "192.168.5.9:6667" ],
"PoolSize": 32,
"DataBaseName": "energy",
"OpenDebugMode": true,
@ -80,65 +140,6 @@
"DefaultIdempotence": true
}
},
"Serilog": {
"Using": [
"Serilog.Sinks.Console",
"Serilog.Sinks.File"
],
"MinimumLevel": {
"Default": "Warning",
"Override": {
"Microsoft": "Warning",
"Volo.Abp": "Warning",
"Hangfire": "Warning",
"DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning",
"Microsoft.AspNetCore": "Warning",
"Microsoft.AspNetCore.Diagnostics.HealthChecks": "Warning"
}
},
"WriteTo": [
{
"Name": "Console"
},
{
"Name": "File",
"Args": {
"path": "logs/logs-.txt",
"rollingInterval": "Hour"
}
}
]
},
"App": {
"SelfUrl": "http://localhost:44315",
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"Jwt": {
"Audience": "JiShe.CollectBus",
"SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=",
"Issuer": "JiShe.CollectBus",
"ExpirationTime": 2
},
"HealthChecks": {
"IsEnable": false,
"HealthCheckDatabaseName": "HealthChecks",
"EvaluationTimeInSeconds": 10,
"MinimumSecondsBetweenFailureNotifications": 60
},
"SwaggerConfig": [
{
"GroupName": "Basic",
"Title": "【后台管理】基础模块",
"Version": "V1"
},
{
"GroupName": "Business",
"Title": "【后台管理】业务模块",
"Version": "V1"
}
],
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus99",
"SystemType": "Energy",

View File

@ -1,4 +1,7 @@
using JiShe.CollectBus.Migration.Host.HealthChecks;
//using Hangfire;
//using Hangfire.Redis.StackExchange;
using JiShe.CollectBus.Migration.Host.Hangfire;
using JiShe.CollectBus.Migration.Host.HealthChecks;
using JiShe.CollectBus.Migration.Host.Swaggers;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.DataProtection;
@ -9,6 +12,7 @@ using StackExchange.Redis;
using System.Text;
using Volo.Abp.AspNetCore.Auditing;
using Volo.Abp.Auditing;
using Volo.Abp.BackgroundJobs;
using Volo.Abp.Caching;
using Volo.Abp.Modularity;
@ -17,6 +21,33 @@ namespace JiShe.CollectBus.Migration.Host
{
public partial class CollectBusMigrationHostModule
{
/// <summary>
/// Configures the hangfire.
/// </summary>
/// <param name="context">The context.</param>
//private void ConfigureHangfire(ServiceConfigurationContext context)
//{
// var redisStorageOptions = new RedisStorageOptions()
// {
// Db = context.Services.GetConfiguration().GetValue<int>("Redis:HangfireDB")
// };
// Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = false; });
// context.Services.AddHangfire(config =>
// {
// config.UseRedisStorage(
// context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
// .WithJobExpirationTimeout(TimeSpan.FromDays(7));
// var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
// const int Attempts = 3; // 重试次数
// config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
// //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
// config.UseFilter(new JobRetryLastFilter(Attempts));
// });
// context.Services.AddHangfireServer();
//}
/// <summary>
/// Configures the JWT authentication.
/// </summary>

View File

@ -2,7 +2,6 @@
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Migration.Host.HealthChecks;
using JiShe.CollectBus.Migration.Host.Swaggers;
using JiShe.CollectBus.MongoDB;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Swashbuckle.AspNetCore.SwaggerUI;
using Volo.Abp;
@ -27,7 +26,6 @@ namespace JiShe.CollectBus.Migration.Host
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule),
typeof(AbpTimingModule),
typeof(CollectBusMongoDbModule),
typeof(CollectBusMigrationApplicationModule),
typeof(AbpCachingStackExchangeRedisModule)
)]
@ -44,6 +42,7 @@ namespace JiShe.CollectBus.Migration.Host
ConfigureSwaggerServices(context, configuration);
//ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration);
//ConfigureHangfire(context);
ConfigureAuditLog(context);
ConfigureCustom(context, configuration);
ConfigureHealthChecks(context, configuration);
@ -85,7 +84,7 @@ namespace JiShe.CollectBus.Migration.Host
options.DefaultModelsExpandDepth(-1);
});
}
app.UseAuditing();
//app.UseAuditing();
app.UseAbpSerilogEnrichers();
app.UseUnitOfWork();
//app.UseHangfireDashboard("/hangfire", new DashboardOptions

View File

@ -0,0 +1,29 @@
using Hangfire.Common;
using Hangfire.States;
using Serilog;
namespace JiShe.CollectBus.Migration.Host.Hangfire
{
/// <summary>
/// 重试最后一次
/// </summary>
public class JobRetryLastFilter : JobFilterAttribute, IElectStateFilter
{
private int RetryCount { get; }
public JobRetryLastFilter(int retryCount)
{
RetryCount = retryCount;
}
public void OnStateElection(ElectStateContext context)
{
var retryAttempt = context.GetJobParameter<int>("RetryCount");
if (RetryCount == retryAttempt)
{
Log.Error("最后一次重试");
}
}
}
}

View File

@ -46,8 +46,6 @@
<!--<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />-->
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
<!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" />
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />
@ -55,7 +53,6 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application.Contracts\JiShe.CollectBus.Migration.Application.Contracts.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application\JiShe.CollectBus.Migration.Application.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Migration.HttpApi\JiShe.CollectBus.Migration.HttpApi.csproj" />