Compare commits

...

37 Commits

Author SHA1 Message Date
179b9a2e91 调整代码 2025-05-23 14:01:33 +08:00
87bf7feff2 调整代码 2025-05-23 10:12:09 +08:00
8b33ec5400 Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-23 10:08:37 +08:00
2d2245fa85 优化kafka订阅消费一个分区一个线程,解决影响主线程问题 2025-05-23 10:07:47 +08:00
ChenYi
4a609b6cf8 Merge branch 'feature_定时抄读_21_CY' into dev 2025-05-22 17:30:43 +08:00
ChenYi
72c50311b6 优化水表处理 2025-05-22 17:29:54 +08:00
7dd833257a 调整日志已小时为单位 2025-05-22 13:53:38 +08:00
c999b0a0a9 Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-22 12:08:29 +08:00
679086b174 优化kafka服务订阅服务线程未启动完,立即重启容器导致资源错误问题 2025-05-22 12:08:07 +08:00
db8b0e0b23 切换为本地开发库 2025-05-22 10:53:01 +08:00
dbb31d1c60 修改密码 2025-05-22 10:38:38 +08:00
f4b7afae48 修改正式环境数据库信息 2025-05-22 10:20:59 +08:00
ChenYi
6393db4dc6 修改日志等级 2025-05-22 10:01:47 +08:00
86da556e98 Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-22 09:34:48 +08:00
95712c4c0e 解决docker 重启导致消费者被释放 2025-05-22 09:33:46 +08:00
ChenYi
54c1643015 Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-21 21:55:18 +08:00
f746d84225 保留审计修复审计报错问题 2025-05-21 17:32:43 +08:00
ChenYi
71a5f7b89b Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-21 17:11:32 +08:00
ChenYi
91602a95c6 实现IoTDB驱动表模型建表初始化处理。 2025-05-21 17:11:27 +08:00
2d2bb0dcc0 增加是否迁移标识和分库标识 2025-05-21 14:08:26 +08:00
2ead6e8242 暂时去掉审计 2025-05-21 10:56:49 +08:00
6adbfe7883 Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-21 10:45:24 +08:00
76019141f1 优化迁移Host,移除一些不必要的组件 2025-05-21 10:45:18 +08:00
ChenYi
27b7452cd1 Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-21 10:14:58 +08:00
ChenYi
6bc3b31f81 暂存代码 2025-05-21 10:14:33 +08:00
ee1f3e3ca9 修改 2025-05-21 10:01:19 +08:00
5d7e7bd7ed 修改配置文件为本地 2025-05-21 09:16:01 +08:00
ChenYi
7a7a68b326 Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-20 21:56:06 +08:00
ChenYi
4ce1741f7e 优化代码 2025-05-20 21:55:48 +08:00
469ea285f2 显示Swagger UI 2025-05-20 17:32:20 +08:00
3d83cf0ccb 修改docker file 2025-05-20 17:10:33 +08:00
6ff97c1c0f Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-20 16:53:48 +08:00
77eabfec5d update 2025-05-20 16:53:36 +08:00
ChenYi
e806a127bf Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev 2025-05-20 16:42:49 +08:00
ChenYi
a78b819233 完善编码映射 2025-05-20 16:41:58 +08:00
d9491b9b33 update 2025-05-20 15:59:31 +08:00
ead03c6361 修改数据库为测试环境 2025-05-20 15:31:53 +08:00
44 changed files with 1569 additions and 862 deletions

View File

@ -1,7 +1,7 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
EXPOSE 8080
EXPOSE 10500
ENV TZ=Asia/Shanghai
ENV ASPNETCORE_ENVIRONMENT=Production
@ -38,15 +38,7 @@ RUN mkdir -p /app/Plugins
# 复制发布内容
COPY --from=publish /app/publish .
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:80/health || exit 1
# 设置入口点
ENTRYPOINT ["dotnet", "JiShe.CollectBus.Host.dll"]
# 启动命令
# 可选:添加命令行参数
# CMD ["--urls", "http://+:80"]

View File

@ -385,6 +385,20 @@ namespace JiShe.CollectBus.IncrementalGenerator
return Activator.CreateInstance(accessorType)!;
});
}
public static object GetAccessor(Type type)
{
MethodInfo getAccessorMethod = typeof(SourceEntityAccessorFactory)
.GetMethod(
name: nameof(GetAccessor),
bindingAttr: BindingFlags.Public | BindingFlags.Static,
types: Type.EmptyTypes
);
MethodInfo genericMethod = getAccessorMethod.MakeGenericMethod(type);
return genericMethod.Invoke(null, null);
}
}
""";
}

View File

@ -0,0 +1,14 @@
namespace JiShe.CollectBus.IoTDB.Attributes
{
/// <summary>
/// 需要忽略表模型初始化,有此特性无需初始化
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class IgnoreInitTableAttribute : System.Attribute
{
public IgnoreInitTableAttribute()
{
}
}
}

View File

@ -10,12 +10,11 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// </summary>
public interface IIoTDbProvider
{
///// <summary>
///// 切换 SessionPool
///// </summary>
///// <param name="useTableSession">是否使用表模型</param>
//void SwitchSessionPool(bool useTableSession);
/// <summary>
/// 切换 SessionPool
/// </summary>
/// <param name="sessionpolType">是否使用表模型</param>
/// <returns></returns>
IIoTDbProvider GetSessionPool(bool sessionpolType);
/// <summary>
@ -66,5 +65,11 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <param name="options"></param>
/// <returns></returns>
Task<BusPagedResult<T>> QueryAsync<T>(IoTDBQueryOptions options) where T : IoTEntity, new();
/// <summary>
/// 初始化表模型
/// </summary>
/// <returns></returns>
Task InitTableSessionModelAsync();
}
}

View File

@ -32,5 +32,12 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <param name="sql"></param>
/// <returns></returns>
Task<SessionDataSet> ExecuteQueryStatementAsync(string sql);
/// <summary>
/// 执行无返回结果SQL
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
Task<int> ExecuteNonQueryStatementAsync(string sql);
}
}

View File

@ -50,7 +50,7 @@ namespace JiShe.CollectBus.IoTDB.Model
/// </summary>
private string _devicePath;
/// <summary>
/// 设备路径
/// 设备路径,树模型使用,表模型会在数据插入的时候直接获取继承类的名称作为表明
/// </summary>
public virtual string DevicePath
{

View File

@ -7,6 +7,7 @@ namespace JiShe.CollectBus.IoTDB.Model
/// Table模型单项数据实体
/// </summary>
[SourceAnalyzers(EntityTypeEnum.TableModel)]
[IgnoreInitTable]
public class TableModelSingleMeasuringEntity<T> : IoTEntity
{
/// <summary>

View File

@ -26,6 +26,8 @@ using System.Diagnostics.Metrics;
using Newtonsoft.Json.Linq;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Text.RegularExpressions;
using System.Xml.Linq;
using System.Linq;
namespace JiShe.CollectBus.IoTDB.Provider
{
@ -86,7 +88,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {tablet.First().InsertTargetName}");
_logger.LogWarning($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {tablet.First().InsertTargetName}");
await CurrentSession.InsertAsync(tablet.First());
}
@ -127,7 +129,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var item in tablet)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {item.InsertTargetName}");
_logger.LogWarning($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 路径为 {item.InsertTargetName}");
await CurrentSession.InsertAsync(item);
}
@ -366,6 +368,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
tableNameOrTreePath = metadata.TableNameOrTreePath;
}
else if(metadata.IsSingleMeasuring && string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath) == true)//单侧点时,且路径没有指定,取默认实体名称和第一个列名组合。
{
tableNameOrTreePath = $"{metadata.EntityName}_{metadata.ColumnNames.First()}";
}
else
{
tableNameOrTreePath = DevicePathBuilder.GetTableName<T>();
@ -631,9 +637,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
// 过滤元组子项
if (member.NameOrPath.Contains(".Item")) continue;
// 类型名称处理
string declaredTypeName = member.DeclaredTypeName;
// 特性查询优化
var attributes = member.CustomAttributes ?? Enumerable.Empty<Attribute>();
var tagAttr = attributes.OfType<TAGColumnAttribute>().FirstOrDefault();
@ -842,6 +845,25 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
/// <summary>
/// 处理不同列类型的逻辑
/// </summary>
/// <param name="groupedColumns"></param>
/// <param name="category"></param>
private string ProcessCategory(IReadOnlyDictionary<ColumnCategory, List<ColumnInfo>> groupedColumns, ColumnCategory category)
{
if (groupedColumns.TryGetValue(category, out var cols))
{
List<string> tempColumnInfos = new List<string>();
foreach (var item in cols)
{
tempColumnInfos.Add($" {item.Name} {item.DataType} {item.Category}");
}
return string.Join(",", tempColumnInfos);
}
return string.Empty;
}
/// <summary>
/// 数据列结构
/// </summary>
@ -976,6 +998,136 @@ namespace JiShe.CollectBus.IoTDB.Provider
return cache;
}
private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled);
/// <summary>
/// 初始化表模型
/// </summary>
/// <returns></returns>
public async Task InitTableSessionModelAsync()
{
//获取JiShe.CollectBus.IoTDB程序集和JiShe.CollectBus.Domain程序集中的所有 [SourceAnalyzers(EntityTypeEnum.TableModel)] 的实体
var assemblyNames = new[] { "JiShe.CollectBus.IoTDB", "JiShe.CollectBus.Domain" };
var assemblies = CommonHelper.LoadAssemblies(assemblyNames);
var targetTypes = CollectTargetTypes(assemblies);
if (targetTypes == null || targetTypes.Count <= 0)
{
_logger.LogError($"{nameof(InitTableSessionModelAsync)} 初始化表模型时没有找到对应的实体类信息。");
return;
}
// @"CREATE TABLE table1(
// time TIMESTAMP TIME,
// region STRING TAG,
// plant_id STRING TAG,
// device_id STRING TAG,
// model_id STRING ATTRIBUTE,
// maintenance STRING ATTRIBUTE,
// temperature FLOAT FIELD,
// humidity FLOAT FIELD,
// status Boolean FIELD,
// arrival_time TIMESTAMP FIELD
//) COMMENT 'table1' WITH(TTL = 31536000000);";
foreach (var item in targetTypes)
{
var accessor = SourceEntityAccessorFactory.GetAccessor(item);
//通过 dynamic 简化操作
dynamic dynamicAccessor = accessor;
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.Append("CREATE TABLE IF NOT EXISTS ");
stringBuilder.Append(dynamicAccessor.EntityName);
stringBuilder.Append("( time TIMESTAMP TIME, ");
var columns = new List<ColumnInfo>();
foreach (var member in dynamicAccessor.MemberList)
{
// 过滤元组子项
if (member.NameOrPath.Contains(".Item")) continue;
// 特性查询优化
var attributes = (IEnumerable<Attribute>)(member.CustomAttributes ?? Enumerable.Empty<Attribute>());
var tagAttr = attributes.OfType<TAGColumnAttribute>().FirstOrDefault();
var attrColumn = attributes.OfType<ATTRIBUTEColumnAttribute>().FirstOrDefault();
var fieldColumn = attributes.OfType<FIELDColumnAttribute>().FirstOrDefault();
// 构建ColumnInfo
ColumnInfo? column = null;
if (tagAttr != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(member.DeclaredTypeName), false, member.DeclaredTypeName);
}
else if (attrColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(member.DeclaredTypeName), false, member.DeclaredTypeName);
}
else if (fieldColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(member.DeclaredTypeName), false, member.DeclaredTypeName);
}
if (!column.HasValue)
{
_logger.LogError($"{nameof(InitTableSessionModelAsync)} 初始化表模型时实体类{dynamicAccessor.EntityName}的{member.NameOrPath}列的ColumnInfo构建失败。");
continue;
}
columns.Add(column.Value);
}
//按业务逻辑顺序处理TAG -> ATTRIBUTE -> FIELD
var groupedColumns = columns
.GroupBy(c => c.Category)
.ToDictionary(g => g.Key, g => g.ToList());
List<string> tempColumInfos = new List<string>();
tempColumInfos.Add( ProcessCategory(groupedColumns, ColumnCategory.TAG));
tempColumInfos.Add(ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE));
tempColumInfos.Add(ProcessCategory(groupedColumns, ColumnCategory.FIELD));
stringBuilder.Append(string.Join(",", tempColumInfos.Where(d => !string.IsNullOrWhiteSpace(d))));
stringBuilder.Append($" ) COMMENT '{item.Name}' ");
_logger.LogWarning($"{dynamicAccessor.EntityName} 初始化语句:{stringBuilder.ToString()}");
await CurrentSession.ExecuteNonQueryStatementAsync($"{stringBuilder.ToString()}");
}
}
/// <summary>
/// 获取程序集中的所有 [SourceAnalyzers(EntityTypeEnum.TableModel)] 的实体
/// </summary>
/// <param name="assemblies"></param>
/// <returns></returns>
private List<Type> CollectTargetTypes(List<Assembly> assemblies)
{
var targetTypes = new List<Type>();
foreach (var assembly in assemblies)
{
try
{
foreach (var type in assembly.GetExportedTypes())
{
//获取表模型特性的类
var sourceAnalyzersAttribute = type.GetCustomAttribute<SourceAnalyzersAttribute>();
//需要忽略表模型初始化,有此特性无需初始化
var ignoreInitTableAttribute = type.GetCustomAttribute<IgnoreInitTableAttribute>();
if (sourceAnalyzersAttribute?.EntityType == EntityTypeEnum.TableModel && ignoreInitTableAttribute == null)
{
if (type.GetConstructor(Type.EmptyTypes) != null)
targetTypes.Add(type);
}
}
}
catch (ReflectionTypeLoadException ex)
{
_logger.LogError($"加载 {assembly} 失败: {string.Join(", ", ex.LoaderExceptions.Select(e => e.Message))}");
}
}
return targetTypes;
}
}
}

View File

@ -72,7 +72,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}请检查IoTEntity继承子类属性索引是否有变动。");
}
//await CloseAsync();
return result;
}
@ -84,8 +83,17 @@ namespace JiShe.CollectBus.IoTDB.Provider
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
}
/// <summary>
/// 执行无返回结果SQL
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<int> ExecuteNonQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteNonQueryStatementAsync(sql);
return result;
}

View File

@ -71,7 +71,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}请检查IoTEntity继承子类属性索引是否有变动。");
}
//await CloseAsync();
return result;
}
@ -83,8 +82,17 @@ namespace JiShe.CollectBus.IoTDB.Provider
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
}
/// <summary>
/// 执行无返回结果SQL
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<int> ExecuteNonQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteNonQueryStatementAsync(sql);
return result;
}

View File

@ -8,10 +8,11 @@ using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.AdminClient;
public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency
public class AdminClientService : IAdminClientService, ISingletonDependency
{
private readonly ILogger<AdminClientService> _logger;
private readonly KafkaOptionConfig _kafkaOptionConfig;
private readonly Lazy<IAdminClient> _lazyAdminClient;
/// <summary>
/// Initializes a new instance of the <see cref="AdminClientService" /> class.
/// </summary>
@ -21,7 +22,8 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
Instance = GetInstance();
//Instance = GetInstance();
_lazyAdminClient = new Lazy<IAdminClient>(() => GetInstance());
}
/// <summary>
@ -30,7 +32,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
/// <value>
/// The instance.
/// </value>
public IAdminClient Instance { get; set; }
public IAdminClient Instance => _lazyAdminClient.Value;
/// <summary>
/// 创建Kafka主题
@ -79,7 +81,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
public async Task<List<string>> ListTopicsAsync()
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return new List<string>(metadata.Topics.Select(t => t.Topic));
return await Task.FromResult(new List<string>(metadata.Topics.Select(t => t.Topic)));
}
/// <summary>
@ -90,7 +92,7 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
public async Task<bool> TopicExistsAsync(string topic)
{
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Any(t => t.Topic == topic);
return await Task.FromResult(metadata.Topics.Any(t => t.Topic == topic));
}
/// <summary>
@ -137,11 +139,6 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe
return metadata.Topics[0].Partitions.Count;
}
public void Dispose()
{
Instance?.Dispose();
}
/// <summary>
/// Gets the instance.
/// </summary>

View File

@ -1,11 +1,13 @@
using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System.Reflection;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
@ -22,20 +24,12 @@ namespace JiShe.CollectBus.Kafka
//var kafkaSection = configuration.GetSection(CommonConst.Kafka);
//KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig();
//kafkaSection.Bind(kafkaOptionConfig);
//if (configuration[CommonConst.ServerTagName] != null)
//{
// kafkaOptionConfig.ServerTagName = configuration[CommonConst.ServerTagName]!;
//}
//context.Services.AddSingleton(kafkaOptionConfig);
//context.Services.Configure<KafkaOptionConfig>(context.Services.GetConfiguration().GetSection(CommonConst.Kafka));
//Configure<KafkaOptionConfig>(kafkaSection);
Configure<KafkaOptionConfig>(options =>
{
configuration.GetSection(CommonConst.Kafka).Bind(options);
});
// 注册Producer
context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer
@ -44,6 +38,12 @@ namespace JiShe.CollectBus.Kafka
// 注册Polly
context.Services.AddSingleton<KafkaPollyPipeline>();
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
// 注册任务调度
context.Services.AddSingleton<KafkaTaskScheduler>();
//context.Services.AddHostedService<HostedService>();
}

View File

@ -12,6 +12,7 @@ using System.Collections.Concurrent;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using YamlDotNet.Core.Tokens;
namespace JiShe.CollectBus.Kafka.Consumer
{
@ -36,17 +37,21 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
private readonly KafkaTaskScheduler _kafkaTaskScheduler;
/// <summary>
/// ConsumerService
/// </summary>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions)
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions, KafkaTaskScheduler kafkaTaskScheduler)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
_applicationOptions = applicationOptions.Value;
_kafkaPollyPipeline = kafkaPollyPipeline;
_kafkaTaskScheduler = kafkaTaskScheduler;
}
#region private
@ -130,21 +135,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
_= Task.Factory.StartNew(async () =>
{
while (!cts.IsCancellationRequested)
{
@ -195,12 +205,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token);
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
@ -226,19 +241,28 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
_ = Task.Factory.StartNew(async () =>
{
int count = 0;
while (!cts.IsCancellationRequested)
@ -288,12 +312,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token);
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
}
@ -342,22 +371,31 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
_ = Task.Factory.StartNew(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
@ -444,12 +482,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
@ -501,23 +544,30 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
_ = Task.Factory.StartNew(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
@ -602,12 +652,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
@ -630,11 +685,13 @@ namespace JiShe.CollectBus.Kafka.Consumer
try
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
if (_consumerStore.TryRemove(consumerKey, out var entry))
if (_consumerStore.TryGetValue(consumerKey, out var entry))
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
// 从字典中移除
_consumerStore.TryRemove(consumerKey, out entry);
}
}
catch (Exception ex)

View File

@ -49,4 +49,11 @@ public class KafkaOptionConfig
/// </summary>
public string? SaslPassword { get; set; }
/// <summary>
/// 订阅任务线程数量
/// 当主题未指定时,订阅任务线程数量默认为:-1
/// 优先级低于订阅任务特性TaskCount值
/// </summary>
public int TaskThreadCount { get; set; } = -1;
}

View File

@ -0,0 +1,169 @@
using System.Collections.Concurrent;
namespace JiShe.CollectBus.Kafka.Internal
{
public class KafkaTaskScheduler : TaskScheduler, IDisposable
{
private readonly BlockingCollection<Task> _tasksCollection=new BlockingCollection<Task> ();
private readonly List<Thread> _workerThreads;
private readonly object _disposeLock = new object();
private bool _isDisposed;
/// <summary>
/// 当前队列中的任务数
/// </summary>
public int QueuedTasks => _tasksCollection.Count;
/// <summary>
/// 当前工作线程数
/// </summary>
public int WorkerThreads => _workerThreads.Count;
/// <summary>
/// 初始化任务调度器
/// </summary>
public KafkaTaskScheduler()
{
// 默认最大并发线程数为CPU核心数
int MaxConcurrencyLevel = Environment.ProcessorCount;
_workerThreads = new List<Thread>(MaxConcurrencyLevel);
for (int i = 0; i < MaxConcurrencyLevel; i++)
{
var thread = new Thread(ExecuteScheduledTasks)
{
IsBackground = true,
Name = $"KafkaWorkerTask-{i + 1}"
};
thread.Start();
_workerThreads.Add(thread);
}
}
/// <summary>
/// 扩容工作线程调度
/// 可以启动多个工作线程来处理任务
/// </summary>
/// <param name="taskNum">扩展独立线程数(默认为1)</param>
public void WorkerThreadExpansion(int taskNum = 1)
{
int currCount = WorkerThreads+1;
Parallel.For(0, taskNum, (index) =>
{
var thread = new Thread(ExecuteScheduledTasks)
{
IsBackground = true,
Name = $"KafkaWorkerTask-{index+ currCount}"
};
thread.Start();
_workerThreads.Add(thread);
});
}
/// <summary>
/// 工作线程执行循环
/// </summary>
private void ExecuteScheduledTasks()
{
try
{
foreach (var task in _tasksCollection.GetConsumingEnumerable())
{
TryExecuteTaskSafely(task);
}
}
catch (OperationCanceledException) { }
catch (ObjectDisposedException) { }
}
/// <summary>
/// 安全执行任务并处理异常
/// </summary>
private void TryExecuteTaskSafely(Task task)
{
try
{
TryExecuteTask(task);
}
catch (OperationCanceledException){}
catch (Exception ex)
{
OnExceptionOccurred(ex);
}
}
#region TaskScheduler
protected override IEnumerable<Task> GetScheduledTasks()
{
ThrowIfDisposed();
return _tasksCollection.ToList();
}
protected override void QueueTask(Task task)
{
ThrowIfDisposed();
_tasksCollection.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 禁止内联执行以强制所有任务在专用线程执行
return false;
}
#endregion
#region
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
lock (_disposeLock)
{
if (_isDisposed) return;
if (disposing)
{
// 停止接收新任务
_tasksCollection.CompleteAdding();
// 等待所有工作线程退出
foreach (var thread in _workerThreads)
{
if (thread.IsAlive)
{
thread.Join(TimeSpan.FromSeconds(5));
}
}
// 释放资源
_tasksCollection.Dispose();
}
_isDisposed = true;
}
}
private void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}
#endregion
#region
/// <summary>
/// 任务执行异常时触发
/// </summary>
public event Action<Exception>? ExceptionEvent;
private void OnExceptionOccurred(Exception ex)
{
ExceptionEvent?.Invoke(ex);
}
#endregion
}
}

View File

@ -33,11 +33,11 @@ namespace JiShe.CollectBus.Kafka
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
/// <summary>
@ -102,7 +102,26 @@ namespace JiShe.CollectBus.Kafka
//}
}
logger.LogWarning($"kafka订阅主题:{_topicSubscribeCount}数,共启动:{_threadCount}线程");
var kafkaTaskScheduler = provider.GetRequiredService<KafkaTaskScheduler>();
kafkaTaskScheduler.ExceptionEvent += (ex) =>
{
logger.LogError(ex, "Kafka任务调度异常");
};
//logger.LogWarning($"kafka订阅工作线程数{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
//
// 订阅调度监控测试可打开
//_ = Task.Factory.StartNew(async () =>
// {
// while (true)
// {
// logger.LogWarning($"kafka订阅工作线程数{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
// await Task.Delay(TimeSpan.FromSeconds(5));
// }
// });
});
}
/// <summary>
@ -123,20 +142,35 @@ namespace JiShe.CollectBus.Kafka
.ToList();
if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes)
Parallel.ForEach(subscribeTypes, subscribeType =>
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe =>
Parallel.ForEach(subscribes, subscribe =>
{
if (subscribe != null)
{
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
//threadCount += tuple.Item1;
//topicCount += tuple.Item2;
}
});
}
});
//foreach (var subscribeType in subscribeTypes)
//{
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(subscribe =>
// {
// if (subscribe != null)
// {
// Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
//}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
@ -156,13 +190,10 @@ namespace JiShe.CollectBus.Kafka
Parallel.ForEach(subscribedMethods, sub =>
{
Interlocked.Increment(ref _topicSubscribeCount);
int partitionCount = sub.Attribute!.TaskCount == -1 ? 3 : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
//int partitionCount = sub.Attribute!.TaskCount == -1 ? topicCount : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
// 可以根据配置文件TaskThreadCount来配置线程数
int partitionCount = sub.Attribute!.TaskCount == -1 ? (kafkaOptionConfig.TaskThreadCount==-1? topicCount: kafkaOptionConfig.TaskThreadCount) : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
//partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
@ -179,7 +210,7 @@ namespace JiShe.CollectBus.Kafka
//foreach (var sub in subscribedMethods)
//{
// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
//// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
// var adminClientService = provider.GetRequiredService<IAdminClientService>();
// int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);

View File

@ -38,16 +38,19 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
#region
/// <summary>
/// 读取计量数据
/// 读取计量数据CTR0_控制码(01H/09H)_DI1_DI0_SER
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public static Telemetry1882018PacketResponse CTR_01_Send(Telemetry1882018PacketRequest request)
public static Telemetry1882018PacketResponse CTR0_01_Send(Telemetry1882018PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];//01
var d_data = itemCodeArr[2];//91 或者 90
var dataUnit = new List<string>() { "1F", d_data, "00" };
var ctr = itemCodeArr[0];//CTR0
var c_data = itemCodeArr[1];//01
var DI1 = itemCodeArr[2];//91 或者 90
var DI0 = itemCodeArr[3];//1F
var SER = itemCodeArr[4];//00
var dataUnit = new List<string>() { DI1, DI0, SER };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry1882018PacketResponse() { Data = dataList };
@ -56,16 +59,19 @@ namespace JiShe.CollectBus.Protocol.T1882018.SendData
#region
/// <summary>
/// 阀控
/// 写数据CTR3_控制码(04H/0CH)_DI1_DI0_SER
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public static Telemetry1882018PacketResponse CTR_04_Send(Telemetry1882018PacketRequest request)
public static Telemetry1882018PacketResponse CTR3_04_Send(Telemetry1882018PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];//01
var d_data = itemCodeArr[2];//55 或者 99
var dataUnit = new List<string>() { "A0", "17", "00", d_data };
var ctr = itemCodeArr[0];//CTR3
var c_data = itemCodeArr[1];//04
var DI1 = itemCodeArr[2];//A0
var DI0 = itemCodeArr[3];//17
var SER = itemCodeArr[4];//55 或者 99
var dataUnit = new List<string>() { DI1, DI0, SER };
var dataList = Build188SendData.Build188SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry1882018PacketResponse() { Data = dataList };

View File

@ -31,7 +31,7 @@ namespace JiShe.CollectBus.Protocol.T1882018
T188ControlHandlers = Telemetry1882018PacketBuilder.T1882018ControlHandlers;
}
public sealed override ProtocolInfo Info => new(nameof(T1882018ProtocolPlugin), "376.1/188-2018", "TCP", "376.1/188-2018协议", "HJ-LXS-15 DN15");
public sealed override ProtocolInfo Info => new(nameof(T1882018ProtocolPlugin), "376.1/188-2018", "TCP", "376.1/188-2018协议", "云集");
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{
@ -93,9 +93,9 @@ namespace JiShe.CollectBus.Protocol.T1882018
//数据转发场景 10H_F1
if (request.ItemCode == T37612012PacketItemCodeConst.AFN10HFN01H && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{
//var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
var subItemCodeArr = request.SubProtocolRequest.ItemCode.Split("_");
var t188PacketHandlerName = $"{T1882018PacketItemCodeConst.BasicT1882018}_{request.SubProtocolRequest.ItemCode}_Send";
var t188PacketHandlerName = $"{subItemCodeArr[0]}_{subItemCodeArr[1]}_Send";
Telemetry1882018PacketResponse t645PacketResponse = null;
if (T188ControlHandlers != null && T188ControlHandlers.TryGetValue(t188PacketHandlerName
@ -124,6 +124,7 @@ namespace JiShe.CollectBus.Protocol.T1882018
FocusAddress = request.FocusAddress,
Fn = fn,
Pn = request.Pn,
SubRequest = request.SubProtocolRequest,
DataUnit = dataUnit,
});
}

View File

@ -101,7 +101,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H
meter.DataValue = value;
}
meter.ItemType = "10_97";
meter.ItemType = T37612012PacketItemCodeConst.AFN10HFN97H;
meter.ValidData = data[2].Equals("91") || data[2].Equals("B1");
meter.FiledDesc = "电网频率";//"电网频率";
meter.FiledName = meter.ItemType.GetDataFieldByGatherDataType() ?? string.Empty;

View File

@ -1,4 +1,5 @@
using FreeRedis;
using Confluent.Kafka;
using FreeRedis;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt;
@ -29,6 +30,7 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
using YamlDotNet.Core.Tokens;
using static FreeSql.Internal.GlobalFilter;
using static IClientRPCService;
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
@ -172,6 +174,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
// 新建
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -190,7 +194,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = data.ItemType,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
@ -208,6 +213,31 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(data.FiledName))
{
await _dbProvider.GetSessionPool(false).InsertAsync(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID);
}
return await Task.FromResult(true);
}
@ -225,6 +255,9 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
var data = analysisBaseDto.Data!;
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
List<TreeModelSingleMeasuringEntity<bool>> meterIsSyncs = new List<TreeModelSingleMeasuringEntity<bool>>();
List<TreeModelSingleMeasuringEntity<int>> meterDataBaseIDs = new List<TreeModelSingleMeasuringEntity<int>>();
foreach (var item in data)
{
if(!item.TimeSpan.HasValue)
@ -278,6 +311,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
{
// 新建
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -296,7 +331,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = item.ItemType,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
@ -314,6 +350,35 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0)
{
treeModelSingleMeasuringEntities.Add(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
if(!meterIsSyncs.Any(a=> a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps))
meterIsSyncs.Add(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID)
};
if (!meterDataBaseIDs.Any(a => a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps))
meterDataBaseIDs.Add(meterIsDatabaseBusiID);
}
}
// 批量保存数据
@ -321,6 +386,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (treeModelSingleMeasuringEntities.Count > 0)
{
await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
// 报存标识字段
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs);
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs);
}
return await Task.FromResult(true);
}
@ -365,7 +435,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -378,7 +448,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}",
Timestamps = timestamps,
DataType = IOTDBDataTypeConst.Status,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
}
@ -386,6 +456,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(data.ItemType);
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -404,7 +476,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = data.ItemType,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
@ -463,7 +536,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -476,7 +549,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
};
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
@ -485,6 +558,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
// 新建
string taskMark = CommonHelper.GetTaskMark(analysisBaseDto.AFN, analysisBaseDto.Fn, analysisBaseDto.Pn, analysisBaseDto.MSA, analysisBaseDto.PSEQ);
var currentTime = DateTime.Now;
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(item.ItemType);
var taskData = new MeterReadingTelemetryPacketInfo()
{
SystemName = _applicationOptions.SystemType,
@ -503,7 +578,8 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
Fn = analysisBaseDto.Fn,
Seq = analysisBaseDto.PSEQ,
MSA = analysisBaseDto.MSA,
ItemCode = item.ItemType,
ItemCode = itemCodeInfo.Item1,
SubItemCode = itemCodeInfo.Item2,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,

View File

@ -1,4 +1,5 @@
using System.Reflection;
using System.Linq;
using System.Reflection;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Extensions;
@ -100,7 +101,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
}
#endregion
#region
#region C控制码_DI3
/// <summary>
/// 变量数据标识编码处理
@ -109,7 +110,7 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
/// <returns></returns>
public static Telemetry6452007PacketResponse C11_02_Send(Telemetry6452007PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');//11_02_80_00_02
var itemCodeArr = request.ItemCode.Split('_');//11_02_80_00_02控制码_DI3_DI2_DI1_DI0
var c_data = itemCodeArr[0];
var DI3 = itemCodeArr[1];
var DI2 = itemCodeArr[2];
@ -118,6 +119,8 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
var dataUnit = new List<string>() { DI3, DI2, DI1, DI0 };
dataUnit.Reverse();
var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry6452007PacketResponse() { Data = dataList };
}

View File

@ -36,38 +36,6 @@ namespace JiShe.CollectBus.Protocol.T6452007
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{
//TODO:645解析报文
//TB3761? tB3761 = Analysis3761(messageReceived);
//if (tB3761 != null)
//{
// if (tB3761.AFN_FC?.AFN == (int)AFN.链路接口检测)
// {
// if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
// {
// _logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
// }
// else
// {
// if (tB3761.DT?.Fn == (int)FN.登录)
// {
// // 登录回复
// if (tB3761.SEQ.CON == (int)CON.需要对该帧进行确认)
// await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// else if (tB3761.DT?.Fn == (int)FN.心跳)
// {
// // 心跳回复
// //心跳帧有两种情况:
// //1. 集中器先有登录帧,再有心跳帧
// //2. 集中器没有登录帧,只有心跳帧
// await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// }
// }
// await OnTcpNormalReceived(client, tB3761);
//}
//return (tB3761 as T)!;
return null;
}

View File

@ -17,15 +17,13 @@ namespace JiShe.CollectBus.Application.Contracts
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="data">待缓存数据</param>
/// <returns></returns>
Task InsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
T data) where T : DeviceCacheBasicModel;
@ -47,15 +45,13 @@ namespace JiShe.CollectBus.Application.Contracts
/// 删除缓存信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="data">已缓存数据</param>
/// <returns></returns>
Task RemoveCacheDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
T data) where T : DeviceCacheBasicModel;
/// <summary>

View File

@ -94,6 +94,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
await dbContext.InitAmmeterCacheData();
await dbContext.InitAmmeterCacheData("V4-Gather-8890");
await dbContext.InitWatermeterCacheData("V4-Gather-8890");
}
}

View File

@ -17,6 +17,7 @@ using static FreeSql.Internal.GlobalFilter;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
using JiShe.CollectBus.IotSystems.Ammeters;
using System.IO.Pipelines;
namespace JiShe.CollectBus.RedisDataCache
{
@ -47,21 +48,18 @@ namespace JiShe.CollectBus.RedisDataCache
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="data">待缓存数据</param>
/// <returns></returns>
public async Task InsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
T data) where T : DeviceCacheBasicModel
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
if (data == null || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey))
{
_logger.LogError($"{nameof(InsertDataAsync)} 参数异常,-101");
return;
@ -70,14 +68,27 @@ namespace JiShe.CollectBus.RedisDataCache
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// 主数据存储Hash
trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize());
// Set索引缓存
trans.SAdd(redisSetIndexCacheKey, $"{data.TimeDensity.ToString().PadLeft(2, '0')}:{data.FocusAddress}");
// 集中器号分组索引Set缓存
trans.SAdd(redisSetIndexCacheKey, data.MemberId);
//检查HSet是否存在对应的信息如果存在需要进一步检查value是否已经存在如果存在则更新不存在则添加
var oldValue = Instance.HGet<List<T>>(redisDeviceInfoHashCacheKey, data.FocusAddress);
if (oldValue == null || oldValue.Count <= 0)//直接添加
{
//设备信息缓存
trans.HSet(redisDeviceInfoHashCacheKey, data.FocusAddress, data);
}
else
{
// 移除缓存中同类型旧数据
oldValue.RemoveAll(device => device.MeterType == data.MeterType);
// 集中器与表计信息排序索引ZSET缓存Key
trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId);
//添加新数据
oldValue.Add(data);
//设备信息缓存
trans.HSet(redisDeviceInfoHashCacheKey, data.FocusAddress, oldValue);
}
var results = trans.Exec();
@ -129,9 +140,24 @@ namespace JiShe.CollectBus.RedisDataCache
// Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, $"{item.Value.First().TimeDensity.ToString().PadLeft(2, '0')}:{item.Value.First().FocusAddress}");
//检查HSet是否存在对应的信息如果存在需要进一步检查value是否已经存在如果存在则更新不存在则添加
var oldValue = Instance.HGet<List<T>>(redisDeviceInfoHashCacheKey, item.Key);
if (oldValue == null || oldValue.Count <= 0)//直接添加
{
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value.Serialize());
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value);
}
else
{
// 移除缓存中同类型旧数据
oldValue.RemoveAll(device => device.MeterType == item.Value[0].MeterType);
//添加新数据
oldValue.AddRange(item.Value);
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, oldValue);
}
}
pipe.EndPipe();
}
@ -146,21 +172,18 @@ namespace JiShe.CollectBus.RedisDataCache
/// 删除缓存信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="data">已缓存数据</param>
/// <returns></returns>
public async Task RemoveCacheDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
T data) where T : DeviceCacheBasicModel
{
if (data == null
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey) )
{
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 参数异常,-101");
return;
@ -169,31 +192,29 @@ namespace JiShe.CollectBus.RedisDataCache
const string luaScript = @"
local hashCacheKey = KEYS[1]
local setIndexCacheKey = KEYS[2]
local zsetScoresIndexCacheKey = KEYS[3]
local member = ARGV[1]
local focusAddress = ARGV[1]
local scoreValue = ARGV[2]
local deleted = 0
if redis.call('HDEL', hashCacheKey, member) > 0 then
if redis.call('HDEL', hashCacheKey, focusAddress) > 0 then
deleted = 1
end
redis.call('SREM', setIndexCacheKey, member)
redis.call('ZREM', zsetScoresIndexCacheKey, member)
redis.call('SREM', setIndexCacheKey, scoreValue)
return deleted
";
var keys = new[]
{
redisHashCacheKey,
redisSetIndexCacheKey,
redisZSetScoresIndexCacheKey
redisDeviceInfoHashCacheKey,
redisSetIndexCacheKey
};
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId });
var result = await Instance.EvalAsync(luaScript, keys, new object[] { data.FocusAddress , data.ScoreValue});
if ((int)result == 0)
{
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102");
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisDeviceInfoHashCacheKey}的{data.MemberId}数据失败,-102");
}
}

View File

@ -30,6 +30,7 @@ using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Sockets;
@ -585,5 +586,35 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await Task.CompletedTask;
}
/// <summary>
/// IoTDB空表查询情况
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestIoTDBEmptyTableQuery()
{
var meter = new MeterReadingTelemetryPacketInfo() { DevicePath = "MeterReadingTelemetryPacketInfo", DeviceId = "1111" };
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
Operator = "=",
Value = meter.DeviceId
};
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = meter.DevicePath,
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
await _iotDBProvider.GetSessionPool(true).InitTableSessionModelAsync();
var pageResult = await _iotDBProvider.GetSessionPool(true).QueryAsync<MeterReadingTelemetryPacketInfo>(query);
await Task.CompletedTask;
}
}

View File

@ -8,6 +8,7 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
@ -28,6 +29,9 @@ using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Guids;
using static FreeSql.Internal.GlobalFilter;
using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata;
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -129,125 +133,32 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
if (timeDensity > 15)
{
timeDensity = 15;
}
//电表定时广播校时,一天一次。
string currentTimeStr = $"{currentTime:HH:mm:00}";
//if (string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))//自动校时
//{
// //_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
// //return;
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
if (string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))//自动校时
{
//_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
//return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
//{
// _ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//}
//else
//{
// _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他");
//}
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
@ -257,58 +168,67 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
_logger.LogWarning($"集中器终端版本信息 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
//_ = CreateMeterPublishTask<DeviceInfo>(
// timeDensity: timeDensity,
// nextTaskTime: currentTime,
// meterType: MeterTypeEnum.Ammeter,
// taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
// {
// var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
// if (tempTask == null || tempTask.Count <= 0)
// {
// _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
// return;
// }
// _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
// });
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器SIM卡读取 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticMonthFreezeTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表月冻结 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表日冻结 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他");
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
var currentTaskTime = tasksToBeIssueModel.LastTaskTime.CalculateNextCollectionTime(timeDensity);//程序启动缓存电表的时候NextTaskTime需要格式化到下一个采集点时间。
@ -320,10 +240,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
//tasksToBeIssueModel.NextTaskTime;
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
//电表最大采集频率为15分钟
if (timeDensity > 15)
{
timeDensity = 15;
}
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
@ -333,7 +257,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask));
@ -410,6 +334,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// 创建取消令牌源
//var cts = new CancellationTokenSource();
await _dbProvider.GetSessionPool(true).InitTableSessionModelAsync();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
// //此处代码不要删除
@ -497,8 +423,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
deviceIds.Add(ammeter.MeterId.ToString());
//处理ItemCode
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
if (ammeter.ItemCodes == null && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
ammeter.ItemCodes = new List<string>();
var itemArr = ammeter.DataTypes.Split(',').ToList();
#region
@ -511,7 +439,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
if (!excludeItemCode.Contains(gatherItem.ItemCode))
{
itemCodeList.Add(gatherItem.ItemCode);
itemCodeList.Add(gatherItem.ItemCode.Replace("WAVE_109", "10_109"));
}
}
@ -530,15 +458,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
ammeter.ItemCodes = itemCodeList;
}
ammeter.ItemCodes = "10_97";
//var tempItemCodeList = new List<string>() { "10_97" };
//ammeter.ItemCodes = tempItemCodeList.Serialize();
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{
@ -706,7 +630,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return null;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
if (ammeterInfo.ItemCodes == null || ammeterInfo.ItemCodes.Count <=0)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null;
@ -753,7 +677,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return null;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
List<string> tempCodes = ammeterInfo.ItemCodes!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
@ -805,13 +729,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(tempItem);
//TODO:特殊表
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
Pn = itemCodeInfo.Item1 == T37612012PacketItemCodeConst.AFN10HFN01H ? 0 : ammeterInfo.MeteringCode,
ItemCode = itemCodeInfo.Item1,
DataTimeMark = new Protocol.DataTimeMark()
{
@ -838,8 +763,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ammeterInfo: ammeterInfo,
timestamps: DateTimeOffset.Now.ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: tempItem,
subItemCode: null,
itemCode: itemCodeInfo.Item1,
subItemCode: itemCodeInfo.Item2,
pendingCopyReadTime: timestamps,
creationTime: currentTime,
packetType: (TelemetryPacketTypeEnum)timeDensity,
@ -884,12 +809,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try
{
#if DEBUG
#else
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -961,12 +890,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try
{
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
#if DEBUG
#else
//判断是否是日冻结抄读时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
_logger.LogInformation($"{nameof(AmmeterScheduledGetAutomaticDayFreezeData)} 非电表日冻结抄读时间,暂不处理");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -974,7 +907,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表日冻结抄读运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
@ -984,7 +917,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item
ItemCode = item,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
DataTime = currentTime.AddDays(-1),//日冻结抄读时间为昨天
},
});
var meterReadingRecords = CreateAmmeterPacketInfo(
@ -1032,12 +971,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try
{
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
#if DEBUG
#else
//需要检查是不是每月1号抄读上个月的数据
if (currentTime.Date != currentTime.FirstDayOfMonth().Date)
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
_logger.LogInformation($"{nameof(AmmeterScheduledGetAutomaticMonthFreezeData)} 非月冻结数据抄读时间,暂不处理");
return null;
}
else
{
timestamps = currentTime.LastDayOfPrdviousMonth();
}
//判断是否是月冻结数据抄读
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticMonthFreezeTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 非电表月冻结抄读时间,暂不处理");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -1045,17 +998,27 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表月冻结抄读时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
foreach (var item in DayFreezeCodes)
foreach (var item in MonthFreezeCodes)
{
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item
ItemCode = item,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
#if DEBUG
DataTime = currentTime.AddMonths(-1),//月冻结抄读时间为上个月
#else
DataTime = timestamps,//月冻结抄读时间为上个月
#endif
},
});
var meterReadingRecords = CreateAmmeterPacketInfo(
@ -1190,6 +1153,42 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
if (meterInfos != null && meterInfos.Count > 0)
{
foreach (var item in meterInfos)
{
if (item.MeterTypeName.Equals("水表") && (item.Protocol.Equals((int)MeterLinkProtocol.CJT_188_2018) || item.Protocol.Equals((int)MeterLinkProtocol.DLT_645_1997) || item.Protocol.Equals((int)MeterLinkProtocol.DLT_645_2007)))
{
if (item.BrandType.Contains("炬华有线"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN0CHFN188H };
}
else
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN0CHFN129H };
}
}
else if (item.MeterTypeName.Trim().Equals("西恩超声波流量计"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
else if (item.MeterTypeName.Trim().Equals("江苏华海涡街流量计积算仪"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
else if (item.MeterTypeName.Trim().Equals("V880BR涡街流量计"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
else if (item.MeterTypeName.Trim().Equals("拓思特涡街流量计H880BR"))
{
item.ItemCodes = new List<string>() { T37612012PacketItemCodeConst.AFN09HFN01H };
}
}
}
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
@ -1317,14 +1316,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
timeDensity = watermeter.TimeDensity;//水表默认为60分钟
typeName = watermeter.LinkType;
if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔"))
if (watermeter.BrandType.Contains("泉高阀门") || watermeter.BrandType.Equals("LXSY-山水翔"))
{
typeName = watermeter.MeterBrand;
typeName = watermeter.BrandType;
}
}
else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
{
typeName = watermeter.MeterBrand;
typeName = watermeter.BrandType;
}
else
{
@ -1340,25 +1339,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.BrandType);
if (protocolPlugin == null)
{
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 创建水表待发送的任务数据时{currentTime}没有找到对应的协议组件,-101");
return null;
}
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = T1882018PacketItemCodeConst.CTR0190;
if (watermeter.ItemCodes == null || watermeter.ItemCodes.Count <=0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 创建水表待发送的任务数据时{watermeter.Name}没有相应的采集项,-102");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
foreach (var item in watermeter.ItemCodes)
{
var tempRequest = new ProtocolBuildRequest()
{
FocusAddress = watermeter.FocusAddress,
Pn = watermeter.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
ItemCode = item,
};
if (item == T37612012PacketItemCodeConst.AFN09HFN01H)
{
//var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo188SubCodeRelationship(T37612012PacketItemCodeConst.AFN10HFN99H, true);//阀控
tempRequest.SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = watermeter.MeterAddress,
Password = watermeter.Password,
ItemCode = subItemCode,
ItemCode = T1882018PacketItemCodeConst.CTR01901F00,
MeteringPort = watermeter.MeteringPort,
Baudrate = watermeter.Baudrate,
};
}
});
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(tempRequest);
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
@ -1367,41 +1381,23 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{itemCode}未能正确获取报文。");
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name} 水表采抄读采集项{T1882018PacketItemCodeConst.CTR01901F00}未能正确获取报文。");
return null;
}
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{watermeter.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
PacketType = (int)TelemetryPacketTypeEnum.WatermeterAutoReadding,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = watermeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
};
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: watermeter,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: T37612012PacketItemCodeConst.AFN10HFN01H,
subItemCode: T1882018PacketItemCodeConst.CTR01901F00,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.WatermeterAutoReadding,
_guidGenerator);
taskList.Add(meterReadingRecords);
}
return taskList;
@ -1504,12 +1500,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try
{
#if DEBUG
#else
//判断是否是自动获取版本号时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -1776,6 +1775,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = guidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
FocusDensity = ammeterInfo.TimeDensity.GetFocusDensity(),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};

View File

@ -96,73 +96,73 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"ammeter/list")]
public override async Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
//#if DEBUG
// var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus109:DeviceInfo";
#if DEBUG
var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus109:DeviceInfo";
// List<DeviceInfo> ammeterInfos = FreeRedisProvider.Instance.Get<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);//542400504
List<DeviceInfo> ammeterInfos = FreeRedisProvider.Instance.Get<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);//542400504
// if (ammeterInfos == null || ammeterInfos.Count <= 0)
if (ammeterInfos == null || ammeterInfos.Count <= 0)
{
ammeterInfos = new List<DeviceInfo>();
//ammeterInfos.Add(new DeviceInfo()
//{
// ammeterInfos = new List<DeviceInfo>();
// //ammeterInfos.Add(new DeviceInfo()
// //{
// // Baudrate = 2400,
// // FocusAddress = "442400040",
// // Name = "保利单箱电表1",
// // FocusId = 95780,
// // DatabaseBusiID = 1,
// // MeteringCode = 0,
// // MeterAddress = "442405000040",
// // MeterId = 127035,
// // TypeName = 1,
// // DataTypes = "581,589,592,597,601",
// // TimeDensity = 15,
// // BrandType = "DTS1980",
// // MeterType = MeterTypeEnum.Ammeter,
// // ProjectID = 1,
// // MeteringPort = MeteringPortConst.MeteringPortTwo,
// // Password = "000000",
// //});
// Baudrate = 2400,
// FocusAddress = "442400040",
// Name = "保利单箱电表1",
// FocusId = 95780,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "442405000040",
// MeterId = 127035,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
// BrandType = "DTS1980",
// MeterType = MeterTypeEnum.Ammeter,
// ProjectID = 1,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
// Password = "000000",
//});
// //ammeterInfos.Add(new DeviceInfo()
// //{
// // Baudrate = 2400,
// // FocusAddress = "442400039",
// // Name = "保利单箱电表2",
// // FocusId = 69280,
// // DatabaseBusiID = 1,
// // MeteringCode = 0,
// // MeterAddress = "442405000039",
// // MeterId = 95594,
// // TypeName = 1,
// // DataTypes = "581,589,592,597,601",
// // TimeDensity = 15,
// // BrandType = "DTS1980",
// // MeterType = MeterTypeEnum.Ammeter,
// // ProjectID = 1,
// // MeteringPort = MeteringPortConst.MeteringPortTwo,
// // Password = "000000",
// //});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "442400039",
// Name = "保利单箱电表2",
// FocusId = 69280,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "442405000039",
// MeterId = 95594,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
// BrandType = "DTS1980",
// MeterType = MeterTypeEnum.Ammeter,
// ProjectID = 1,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
// Password = "000000",
//});
// //ammeterInfos.Add(new DeviceInfo()
// //{
// // Baudrate = 2400,
// // FocusAddress = "402440506",
// // Name = "中环半导体9#冷却泵-220KW(三相电表)",
// // FocusId = 106857,
// // DatabaseBusiID = 1,
// // MeteringCode = 0,
// // MeterAddress = "402410040506",
// // MeterId = 139059,
// // TypeName = 3,
// // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// // TimeDensity = 15,
// // BrandType = "DTS1980",
// // Password = "000000",
// // ProjectID = 1,
// // MeterType = MeterTypeEnum.Ammeter,
// // MeteringPort = MeteringPortConst.MeteringPortTwo,
// //});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "402440506",
// Name = "中环半导体9#冷却泵-220KW(三相电表)",
// FocusId = 106857,
// DatabaseBusiID = 1,
// MeteringCode = 0,
// MeterAddress = "402410040506",
// MeterId = 139059,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DTS1980",
// Password = "000000",
// ProjectID = 1,
// MeterType = MeterTypeEnum.Ammeter,
// MeteringPort = MeteringPortConst.MeteringPortTwo,
//});
//ammeterInfos.Add(new DeviceInfo()
@ -206,15 +206,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// MeteringPort = MeteringPortConst.MeteringPortTwo,
//});
ammeterInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "322011149",
Name = "DDS1980-T4(5-60) ML307A 长稳 322011149",
FocusId = 57685,
DatabaseBusiID = 1,
MeteringCode = 0,
MeterAddress = "31240010270",
MeterId = 78973,
TypeName = 3,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
BrandType = "DTS1980",
Password = "000000",
ProjectID = 1,
MeterType = MeterTypeEnum.Ammeter,
MeteringPort = MeteringPortConst.MeteringPortTwo,
});
// FreeRedisProvider.Instance.Set(redisCacheDeviceInfoHashKeyTemp, ammeterInfos);
// }
FreeRedisProvider.Instance.Set(redisCacheDeviceInfoHashKeyTemp, ammeterInfos);
}
// return ammeterInfos;
//#else
return ammeterInfos;
#else
//#endif
#endif
try
@ -256,7 +275,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
#if DEBUG
//// sql = $@"{sql} and c.Address in('542410000504','442405000040','442405000039','402410040506')";
// sql = $@"{sql} and c.Address in('542410000504','442405000040','442405000039','402410040506')";
sql = $@"{sql} and c.Address in('402410040506')";
#endif
@ -508,7 +527,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
Pn = 0,//ammeterInfo.MeteringCode,现有协议里面阀控必须传0不能根据档案的MeteringCode值走。
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
@ -560,6 +579,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
try
{
#if DEBUG
var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus119:DeviceInfo";
List<DeviceInfo> deviceInfos = FreeRedisProvider.Instance.Get<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);
if (deviceInfos == null || deviceInfos.Count <= 0)
{
deviceInfos = new List<DeviceInfo>();
deviceInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "322011149",
Name = "LXSY-25E 保利",
FocusId = 57675,
DatabaseBusiID = 1,
MeteringCode = 0,
MeterAddress = "341587000473",
MeterId = 1025,
TypeName = 1,
TimeDensity = 60,
BrandType = "云集",
MeterType = MeterTypeEnum.WaterMeter,
ProjectID = 1,
MeteringPort = MeteringPortConst.MeteringPortTwo,
Password = "000000",
LinkType = "RS-485",
TimesRate = 1.0000m,
});
FreeRedisProvider.Instance.Set(redisCacheDeviceInfoHashKeyTemp, deviceInfos);
}
return deviceInfos;
#else
string sql = $@"SELECT
A.ID as MeterId,
A.Name,
@ -575,7 +628,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
A.LinkType,
A.HaveValve,
A.MeterType AS MeterTypeName,
A.MeterBrand,
A.MeterBrand AS BrandType,
A.TimesRate,
A.TimeDensity,
A.TripState,
@ -602,6 +655,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<DeviceInfo>(sql);
#endif
}
catch (Exception)
{

View File

@ -47,13 +47,6 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// </summary>
public string MeterTypeName { get; set; }
/// <summary>
/// 设备品牌;
/// (当 MeterType = 水表, 如 威铭、捷先 等)
/// (当 MeterType = 流量计, 如 西恩超声波流量计、西恩电磁流量计、涡街流量计 等)
/// </summary>
public string MeterBrand { get; set; }
/// <summary>
/// 倍率
/// </summary>
@ -75,7 +68,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
public string AreaCode { get; set; }
/// <summary>
/// 电表类别 1单相、2三相三线、3三相四线,
/// 仅当MeterType为电表时电表类别 1单相、2三相三线、3三相四线,
/// 07协议 开合闸指令(1A开闸断电,1C单相表合闸,1B多相表合闸) 645 2007 表
/// 97协议//true(合闸);false(跳闸) 545 1997 没有单相多相 之分 "true" ? "9966" : "3355"
/// </summary>
@ -120,7 +113,8 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// <summary>
/// 该电表方案下采集项JSON格式["0D_80","0D_80"]
/// </summary>
public string ItemCodes { get; set; }
[Column(IsIgnore = true)]
public List<string> ItemCodes { get; set; }
/// <summary>
/// State表状态:

View File

@ -13,7 +13,8 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// 设备表型数据信息
/// </summary>
[SourceAnalyzers(EntityTypeEnum.TableModel)]
public class DeviceTreeModelDataInfo: IoTEntity
[IgnoreInitTable]
public class DeviceTableModelDataInfo : IoTEntity
{
[FIELDColumn]

View File

@ -12,8 +12,8 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// <summary>
/// 设备树模型数据信息
/// </summary>
[SourceAnalyzers(EntityTypeEnum.TableModel)]
public class DeviceTableModelDataInfo : IoTEntity
[SourceAnalyzers(EntityTypeEnum.TreeModel)]
public class DeviceTreeModelDataInfo : IoTEntity
{
[FIELDColumn]

View File

@ -151,6 +151,12 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
[FIELDColumn]
public string IssuedMessageId { get; set; }
/// <summary>
/// 集中器采集密度
/// </summary>
[FIELDColumn]
public int FocusDensity { get; set; }
/// <summary>
/// 消息上报内容
/// </summary>

View File

@ -14,9 +14,14 @@ namespace JiShe.CollectBus.Common.Consts
#region
/// <summary>
/// 基路径
/// 基路径,表示主站发起读数据
/// </summary>
public const string BasicT1882018 = "CTR";
public const string BasicT1882018Read = "CTR0";
/// <summary>
/// 基路径,表示主站发起读数据
/// </summary>
public const string BasicT1882018Write = "CTR3";
#region
@ -24,12 +29,12 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 读取计量数据1
/// </summary>
public const string CTR0190 = $"01_90";
public const string CTR01901F00 = $"{BasicT1882018Read}_01_90_1F_00";
/// <summary>
/// 读取计量数据2
/// </summary>
public const string CTR0191 = $"01_91";
public const string CTR01911F00 = $"{BasicT1882018Read}_01_91_1F_00";
#endregion
@ -38,12 +43,12 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 关阀
/// </summary>
public const string CTR30455 = $"_04_55";
public const string CTR304A01755 = $"{BasicT1882018Write}_04_A0_17_55";
/// <summary>
/// 开阀
/// </summary>
public const string CTR30499 = $"_04_99";
public const string CTR304A01799 = $"{BasicT1882018Write}_04_A0_17_99";
#endregion

View File

@ -17,7 +17,7 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 透明转发
/// </summary>
public const string AFN10HFN01H = $"10_01";
public const string AFN10HFN01H = $"10_1";
/// <summary>
@ -28,12 +28,27 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 读取终端信息
/// </summary>
public const string AFN09HFN01H = $"09_01";
public const string AFN09HFN01H = $"09_1";
/// <summary>
/// 远程通信模块版本信息
/// </summary>
public const string AFN09HFN09H = $"09_09";
public const string AFN09HFN09H = $"09_9";
/// <summary>
/// 水表阀控
/// </summary>
public const string AFN10HFN99H = $"10_99";
/// <summary>
/// 炬华有线水表抄读
/// </summary>
public const string AFN0CHFN188H = $"0C_188";
/// <summary>
/// 标准188协议水表抄读
/// </summary>
public const string AFN0CHFN129H = $"0C_129";
#endregion
@ -170,9 +185,9 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary>
/// 集中器状态字段
/// IotDB存储字段字段
/// </summary>
public class ConcentratorStatusFieldConst
public class IotDbFieldConst
{
/// <summary>
@ -185,13 +200,23 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary>
public const string FrameData = "FrameData";
/// <summary>
/// 是否同步
/// </summary>
public const string IsSync = "IsSync";
/// <summary>
/// 数据库业务ID
/// </summary>
public const string DatabaseBusiID= "DatabaseBusiID";
}
#endregion
/// <summary>
/// 特殊645编码关系映射
/// </summary>
/// <param name="itemCode"></param>
/// <param name="itemCode">特殊3761编码</param>
/// <returns></returns>
public static (string,string) MappingItemCodeTo645SubCodeRelationship(string itemCode)
{
@ -201,5 +226,20 @@ namespace JiShe.CollectBus.Common.Consts
_ => (itemCode,""),
};
}
/// <summary>
/// 特殊188编码关系映射
/// </summary>
/// <param name="itemCode">特殊3761编码</param>
/// <param name="tripState">TripState 0 合闸-开阀, 1 关阀);开阀关阀</param>
/// <returns></returns>
public static (string, string) MappingItemCodeTo188SubCodeRelationship(string itemCode,bool tripState)
{
return itemCode switch
{
AFN10HFN99H => (AFN10HFN01H, tripState == true ?T1882018PacketItemCodeConst.CTR304A01799: T1882018PacketItemCodeConst.CTR304A01755),
_ => (itemCode, ""),
};
}
}
}

View File

@ -265,5 +265,79 @@ namespace JiShe.CollectBus.Common.Extensions
}
return DateTime.TryParseExact(dateLong.ToString(), "yyyyMMdd HHmmssZZ", null, System.Globalization.DateTimeStyles.None, out DateTime date) ? date : throw new ArgumentException("Date must be between 10000101 and 99991231.");
}
/// <summary>
/// 取得某月的第一天
/// </summary>
/// <param name="datetime">要取得月份第一天的时间</param>
/// <returns></returns>
public static DateTime FirstDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day);
}
///<summary>
/// 取得某月的最后一天
/// </summary>
/// <param name="datetime">要取得月份最后一天的时间</param>
/// <returns></returns>
public static DateTime LastDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddMonths(1).AddDays(-1);
}
/// <summary>
/// 取得上个月第一天
/// </summary>
/// <param name="datetime">要取得上个月第一天的当前时间</param>
/// <returns></returns>
public static DateTime FirstDayOfPreviousMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddMonths(-1);
}
/// <summary>
/// 取得上个月的最后一天
/// </summary>
/// <param name="datetime">要取得上个月最后一天的当前时间</param>
/// <returns></returns>
public static DateTime LastDayOfPrdviousMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddDays(-1);
}
/// <summary>
/// 取得某月第一天0点以及最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetMonthDateRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.FirstDayOfMonth(), new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到当月最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateToLastDayRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateRange(this DateTime datetime)
{
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(datetime.Year, datetime.Month, datetime.Day, 23, 59, 59));
}
}
}

View File

@ -136,57 +136,7 @@ namespace JiShe.CollectBus.Common.Helpers
return objModel;
}
/// <summary>
/// 取得某月的第一天
/// </summary>
/// <param name="datetime">要取得月份第一天的时间</param>
/// <returns></returns>
public static DateTime FirstDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day);
}
///<summary>
/// 取得某月的最后一天
/// </summary>
/// <param name="datetime">要取得月份最后一天的时间</param>
/// <returns></returns>
public static DateTime LastDayOfMonth(this DateTime datetime)
{
return datetime.AddDays(1 - datetime.Day).AddMonths(1).AddDays(-1);
}
/// <summary>
/// 取得某月第一天0点以及最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetMonthDateRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.FirstDayOfMonth(), new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到当月最后一天的23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateToLastDayRange(this DateTime datetime)
{
var lastDayOfMonthDate = LastDayOfMonth(datetime);
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
}
/// <summary>
/// 取得某一天0点到23:59:59时间范围
/// </summary>
/// <param name="datetime"></param>
/// <returns></returns>
public static Tuple<DateTime, DateTime> GetCurrentDateRange(this DateTime datetime)
{
return new Tuple<DateTime, DateTime>(datetime.Date, new DateTime(datetime.Year, datetime.Month, datetime.Day, 23, 59, 59));
}
/// <summary>
/// 获取指定枚举的所有 Attribute 说明以及value组成的键值对
@ -915,5 +865,72 @@ namespace JiShe.CollectBus.Common.Helpers
return Convert.ToInt64(scoresStr);
}
/// <summary>
/// 加载指定名称的程序集
/// </summary>
/// <param name="assemblyNames"></param>
/// <returns></returns>
public static List<Assembly> LoadAssemblies(string[] assemblyNames)
{
var assemblies = new List<Assembly>();
// 获取已加载的程序集
foreach (var asm in AppDomain.CurrentDomain.GetAssemblies())
{
if (assemblyNames.Contains(asm.GetName().Name))
assemblies.Add(asm);
}
// 尝试加载未加载的程序集
foreach (var name in assemblyNames)
{
if (!assemblies.Any(a => a.GetName().Name == name))
{
try
{
var assembly = Assembly.Load(name);
assemblies.Add(assembly);
}
catch (FileNotFoundException)
{
// 若Load失败尝试从基目录加载
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"{name}.dll");
if (File.Exists(path))
{
try
{
assemblies.Add(Assembly.LoadFrom(path));
}
catch { /* 记录错误 */ }
}
}
}
}
return assemblies;
}
/// <summary>
/// 创建类型实例
/// </summary>
/// <param name="types"></param>
/// <returns></returns>
public static List<object> CreateInstances(List<Type> types)
{
var instances = new List<object>();
foreach (var type in types)
{
try
{
instances.Add(Activator.CreateInstance(type));
}
catch (Exception)
{
throw;
}
}
return instances;
}
}
}

View File

@ -76,8 +76,8 @@ namespace JiShe.CollectBus.Host
app.UseCors(CollectBusHostConst.DefaultCorsPolicyName);
app.UseAuthentication();
app.UseAuthorization();
if (env.IsDevelopment())
{
//if (env.IsDevelopment())
//{
app.UseSwagger();
app.UseAbpSwaggerUI(options =>
{
@ -88,7 +88,7 @@ namespace JiShe.CollectBus.Host
options.DocExpansion(DocExpansion.None);
options.DefaultModelsExpandDepth(-1);
});
}
//}
app.UseAuditing();
app.UseAbpSerilogEnrichers();
app.UseUnitOfWork();

View File

@ -19,6 +19,10 @@
<Content Remove="C:\Users\Dai Zan\.nuget\packages\volo.abp.aspnetcore\8.3.3\contentFiles\any\net8.0\Volo.Abp.AspNetCore.abppkg.analyze.json" />
</ItemGroup>
<ItemGroup>
<None Remove="Plugins\ignore.txt" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
@ -52,6 +56,10 @@
<PackageReference Include="Hangfire.Dashboard.BasicAuthorization" Version="1.0.2" />-->
</ItemGroup>
<ItemGroup>
<Page Include="Plugins\ignore.txt" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
@ -75,9 +83,6 @@
<None Update="Plugins\JiShe.CollectBus.Protocol.T6452007.dll">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Plugins\JiShe.CollectBus.Protocol.Test.dll">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

View File

@ -1,8 +1,8 @@
{
"ConnectionStrings": {
"Default": "mongodb://admin:4mFmPTTB8tn6aI@47.110.62.104:27017,47.110.53.196:27017,47.110.60.222:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
"PrepayDB": "server=rm-m5el3d1u1k0wzk70n2o.sqlserver.rds.aliyuncs.com,3433;database=jishe.sysdb;uid=v3sa;pwd=JiShe123;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=rm-wz9hw529i3j1e3b5fbo.sqlserver.rds.aliyuncs.com,3433;database=db_energy;uid=yjdb;pwd=Kdjdhf+9*7ad222LL;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
"Configuration": "47.110.60.222:6379,password=3JBGfyhTaD46nS,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
@ -18,13 +18,16 @@
"SaslPassword": "lixiao@1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
"TaskThreadCount": -1,
"ServerTagName": "JiSheCollectBus100",
"FirstCollectionTime": "2025-04-22 16:07:00"
},
"IoTDBOptions": {
"UserName": "root",
"Password": "Yp2eU6MVdIjXCL",
"ClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"Password": "root",
//"ClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"Password": "root",
"ClusterList": [ "121.42.175.177:16667" ],
"PoolSize": 2,
"DataBaseName": "energy",
"OpenDebugMode": true,

View File

@ -1,39 +1,4 @@
{
"Serilog": {
"Using": [
"Serilog.Sinks.Console",
"Serilog.Sinks.File"
],
"MinimumLevel": {
"Default": "Warning",
"Override": {
"Microsoft": "Warning",
"Volo.Abp": "Warning",
"Hangfire": "Warning",
"DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning",
"Microsoft.AspNetCore": "Warning",
"Microsoft.AspNetCore.Diagnostics.HealthChecks": "Warning"
}
},
"WriteTo": [
{
"Name": "Console"
},
{
"Name": "File",
"Args": {
"path": "logs/logs-.txt",
"rollingInterval": "Day"
}
}
]
},
"App": {
"SelfUrl": "http://localhost:44315",
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"ConnectionStrings": {
"Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.5.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.5.9:29092,192.168.5.9:39092,192.168.5.9:49092",
@ -46,46 +11,21 @@
"DefaultDB": "14",
"HangfireDB": "13"
},
"Jwt": {
"Audience": "JiShe.CollectBus",
"SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=",
"Issuer": "JiShe.CollectBus",
"ExpirationTime": 2
},
"HealthChecks": {
"IsEnable": true,
"HealthCheckDatabaseName": "HealthChecks",
"EvaluationTimeInSeconds": 10,
"MinimumSecondsBetweenFailureNotifications": 60
},
"SwaggerConfig": [
{
"GroupName": "Basic",
"Title": "【后台管理】基础模块",
"Version": "V1"
},
{
"GroupName": "Business",
"Title": "【后台管理】业务模块",
"Version": "V1"
}
],
"Kafka": {
"BootstrapServers": "192.168.5.9:29092,192.168.5.9:39092,192.168.5.9:49092",
"EnableFilter": true,
"EnableAuthorization": false,
"SecurityProtocol": "SaslPlaintext",
"SaslMechanism": "Plain",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
"TaskThreadCount": -1,
"FirstCollectionTime": "2025-04-22 16:07:00"
},
"IoTDBOptions": {
"UserName": "root",
"Password": "root",
"ClusterList": [ "192.168.5.9:6667" ],
"ClusterList": [ "121.42.175.177:16667" ],
"PoolSize": 32,
"DataBaseName": "energy",
"OpenDebugMode": true,
@ -140,6 +80,65 @@
"DefaultIdempotence": true
}
},
"Serilog": {
"Using": [
"Serilog.Sinks.Console",
"Serilog.Sinks.File"
],
"MinimumLevel": {
"Default": "Warning",
"Override": {
"Microsoft": "Warning",
"Volo.Abp": "Warning",
"Hangfire": "Warning",
"DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning",
"Microsoft.AspNetCore": "Warning",
"Microsoft.AspNetCore.Diagnostics.HealthChecks": "Warning"
}
},
"WriteTo": [
{
"Name": "Console"
},
{
"Name": "File",
"Args": {
"path": "logs/logs-.txt",
"rollingInterval": "Hour"
}
}
]
},
"App": {
"SelfUrl": "http://localhost:44315",
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"Jwt": {
"Audience": "JiShe.CollectBus",
"SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=",
"Issuer": "JiShe.CollectBus",
"ExpirationTime": 2
},
"HealthChecks": {
"IsEnable": false,
"HealthCheckDatabaseName": "HealthChecks",
"EvaluationTimeInSeconds": 10,
"MinimumSecondsBetweenFailureNotifications": 60
},
"SwaggerConfig": [
{
"GroupName": "Basic",
"Title": "【后台管理】基础模块",
"Version": "V1"
},
{
"GroupName": "Business",
"Title": "【后台管理】业务模块",
"Version": "V1"
}
],
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus99",
"SystemType": "Energy",

View File

@ -1,7 +1,4 @@
//using Hangfire;
//using Hangfire.Redis.StackExchange;
using JiShe.CollectBus.Migration.Host.Hangfire;
using JiShe.CollectBus.Migration.Host.HealthChecks;
using JiShe.CollectBus.Migration.Host.HealthChecks;
using JiShe.CollectBus.Migration.Host.Swaggers;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.DataProtection;
@ -12,7 +9,6 @@ using StackExchange.Redis;
using System.Text;
using Volo.Abp.AspNetCore.Auditing;
using Volo.Abp.Auditing;
using Volo.Abp.BackgroundJobs;
using Volo.Abp.Caching;
using Volo.Abp.Modularity;
@ -21,33 +17,6 @@ namespace JiShe.CollectBus.Migration.Host
{
public partial class CollectBusMigrationHostModule
{
/// <summary>
/// Configures the hangfire.
/// </summary>
/// <param name="context">The context.</param>
//private void ConfigureHangfire(ServiceConfigurationContext context)
//{
// var redisStorageOptions = new RedisStorageOptions()
// {
// Db = context.Services.GetConfiguration().GetValue<int>("Redis:HangfireDB")
// };
// Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = false; });
// context.Services.AddHangfire(config =>
// {
// config.UseRedisStorage(
// context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
// .WithJobExpirationTimeout(TimeSpan.FromDays(7));
// var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
// const int Attempts = 3; // 重试次数
// config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
// //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
// config.UseFilter(new JobRetryLastFilter(Attempts));
// });
// context.Services.AddHangfireServer();
//}
/// <summary>
/// Configures the JWT authentication.
/// </summary>

View File

@ -2,6 +2,7 @@
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Migration.Host.HealthChecks;
using JiShe.CollectBus.Migration.Host.Swaggers;
using JiShe.CollectBus.MongoDB;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Swashbuckle.AspNetCore.SwaggerUI;
using Volo.Abp;
@ -26,6 +27,7 @@ namespace JiShe.CollectBus.Migration.Host
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule),
typeof(AbpTimingModule),
typeof(CollectBusMongoDbModule),
typeof(CollectBusMigrationApplicationModule),
typeof(AbpCachingStackExchangeRedisModule)
)]
@ -42,7 +44,6 @@ namespace JiShe.CollectBus.Migration.Host
ConfigureSwaggerServices(context, configuration);
//ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration);
//ConfigureHangfire(context);
ConfigureAuditLog(context);
ConfigureCustom(context, configuration);
ConfigureHealthChecks(context, configuration);
@ -84,7 +85,7 @@ namespace JiShe.CollectBus.Migration.Host
options.DefaultModelsExpandDepth(-1);
});
}
//app.UseAuditing();
app.UseAuditing();
app.UseAbpSerilogEnrichers();
app.UseUnitOfWork();
//app.UseHangfireDashboard("/hangfire", new DashboardOptions

View File

@ -1,29 +0,0 @@
using Hangfire.Common;
using Hangfire.States;
using Serilog;
namespace JiShe.CollectBus.Migration.Host.Hangfire
{
/// <summary>
/// 重试最后一次
/// </summary>
public class JobRetryLastFilter : JobFilterAttribute, IElectStateFilter
{
private int RetryCount { get; }
public JobRetryLastFilter(int retryCount)
{
RetryCount = retryCount;
}
public void OnStateElection(ElectStateContext context)
{
var retryAttempt = context.GetJobParameter<int>("RetryCount");
if (RetryCount == retryAttempt)
{
Log.Error("最后一次重试");
}
}
}
}

View File

@ -46,6 +46,8 @@
<!--<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />-->
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
<!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" />
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />
@ -53,6 +55,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application.Contracts\JiShe.CollectBus.Migration.Application.Contracts.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application\JiShe.CollectBus.Migration.Application.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Migration.HttpApi\JiShe.CollectBus.Migration.HttpApi.csproj" />