diff --git a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs index 2cce113..6d26bdc 100644 --- a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs +++ b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs @@ -16,7 +16,7 @@ public class CollectBusIoTDbModule : AbpModule var configuration = context.Services.GetConfiguration(); Configure(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); }); - // 注册上下文为Scoped - context.Services.AddScoped(); + //// 注册上下文为Scoped + //context.Services.AddScoped(); } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs index afad488..ef68325 100644 --- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs +++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs @@ -1,16 +1,17 @@ using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.IoTDB.Context { /// /// IoTDB SessionPool 运行时上下文 /// - public class IoTDbRuntimeContext + public class IoTDBRuntimeContext: IScopedDependency { private readonly bool _defaultValue; - public IoTDbRuntimeContext(IOptions options) + public IoTDBRuntimeContext(IOptions options) { _defaultValue = options.Value.UseTableSessionPoolByDefault; UseTableSessionPool = _defaultValue; diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs index b896bdf..82a0d47 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.IoTDB.Provider; namespace JiShe.CollectBus.IoTDB.Interface { @@ -31,6 +32,15 @@ namespace JiShe.CollectBus.IoTDB.Interface /// Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity; + /// + /// 批量插入数据 + /// + /// + /// 设备元数据 + /// + /// + Task BatchInsertAsync(DeviceMetadata deviceMetadata,IEnumerable entities) where T : IoTEntity; + /// /// 删除数据 @@ -38,7 +48,14 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// /// - Task DeleteAsync(QueryOptions options) where T : IoTEntity; + Task DeleteAsync(IoTDBQueryOptions options) where T : IoTEntity; + + /// + /// 获取设备元数据 + /// + /// + /// + Task GetMetadata() where T : IoTEntity; /// /// 查询数据 @@ -46,6 +63,6 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// /// - Task> QueryAsync(QueryOptions options) where T : IoTEntity, new(); + Task> QueryAsync(IoTDBQueryOptions options) where T : IoTEntity, new(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index 73a90e0..92f98e1 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Model /// 系统名称 /// [TAGColumn] - public required string SystemName { get; set; } + public string SystemName { get; set; } /// /// 项目编码 /// - [TAGColumn] - public required string ProjectCode { get; set; } + [ATTRIBUTEColumn] + public string ProjectId { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 /// - [TAGColumn] - public required string DeviceType { get; set; } + [ATTRIBUTEColumn] + public string DeviceType { get; set; } /// - /// 设备ID + /// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址 /// [TAGColumn] - public required string DeviceId { get; set; } + public string DeviceId { get; set; } /// - /// 当前时间戳,单位毫秒,必须通过DateTimeOffset获取 + /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// - public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs index 69463ba..0d01f81 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs @@ -42,5 +42,10 @@ /// 是否使用表模型存储, 默认false,使用tree模型存储 /// public bool UseTableSessionPoolByDefault { get; set; } = false; + + /// + /// 时区,默认为:"UTC+08:00" + /// + public string ZoneId { get; set; } = "UTC+08:00"; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Options/QueryOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBQueryOptions.cs similarity index 78% rename from modules/JiShe.CollectBus.IoTDB/Options/QueryOptions.cs rename to modules/JiShe.CollectBus.IoTDB/Options/IoTDBQueryOptions.cs index 90e44b2..bff4641 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/QueryOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBQueryOptions.cs @@ -3,7 +3,7 @@ /// /// 查询条件 /// - public class QueryOptions + public class IoTDBQueryOptions { /// /// 表模型的表名称或者树模型的设备路径 @@ -13,7 +13,7 @@ /// /// 分页 /// - public int Page { get; set; } + public int PageIndex { get; set; } /// /// 分页大小 @@ -23,6 +23,6 @@ /// /// 查询条件 /// - public List Conditions { get; } = new(); + public List Conditions { get; set; } = new(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs index cf6d3a9..40dd443 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs @@ -9,10 +9,17 @@ /// 字段 /// public string Field { get; set; } + /// - /// 操作符 + /// 操作符,>,=,< /// public string Operator { get; set; } + + /// + /// 是否数值,如果是数值,则进行数值比较,否则进行字符串比较 + /// + public bool IsNumber { get; set; } = false; + /// /// 值 /// diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index f9127db..6922c62 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; } @@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDeviceTableName(T entity) where T : IoTEntity { - return $"{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 9b76a6f..7cfaf32 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -2,8 +2,12 @@ using System.Collections.Concurrent; using System.Reflection; using System.Text; +using System.Threading.Tasks; using Apache.IoTDB; using Apache.IoTDB.DataStructure; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Context; @@ -24,7 +28,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private static readonly ConcurrentDictionary MetadataCache = new(); private readonly ILogger _logger; private readonly IIoTDbSessionFactory _sessionFactory; - private readonly IoTDbRuntimeContext _runtimeContext; + private readonly IoTDBRuntimeContext _runtimeContext; private IIoTDbSessionPool CurrentSession => _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); @@ -38,7 +42,7 @@ namespace JiShe.CollectBus.IoTDB.Provider public IoTDbProvider( ILogger logger, IIoTDbSessionFactory sessionFactory, - IoTDbRuntimeContext runtimeContext) + IoTDBRuntimeContext runtimeContext) { _logger = logger; _sessionFactory = sessionFactory; @@ -54,11 +58,19 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task InsertAsync(T entity) where T : IoTEntity { - var metadata = GetMetadata(); + try + { + var metadata = await GetMetadata(); - var tablet = BuildTablet(new[] { entity }, metadata); + var tablet = BuildTablet(new[] { entity }, metadata); - await CurrentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet); + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常"); + throw; + } } /// @@ -68,15 +80,51 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { - var metadata = GetMetadata(); - - var batchSize = 1000; - var batches = entities.Chunk(batchSize); - - foreach (var batch in batches) + try { - var tablet = BuildTablet(batch, metadata); - await CurrentSession.InsertAsync(tablet); + var metadata = await GetMetadata(); + + var batchSize = 1000; + var batches = entities.Chunk(batchSize); + + foreach (var batch in batches) + { + var tablet = BuildTablet(batch, metadata); + await CurrentSession.InsertAsync(tablet); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常"); + throw; + } + } + + /// + /// 批量插入数据 + /// + /// + /// 设备元数据 + /// + /// + public async Task BatchInsertAsync(DeviceMetadata deviceMetadata, IEnumerable entities) where T : IoTEntity + { + try + { + + var batchSize = 1000; + var batches = entities.Chunk(batchSize); + + foreach (var batch in batches) + { + var tablet = BuildTablet(batch, deviceMetadata); + await CurrentSession.InsertAsync(tablet); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常"); + throw; } } @@ -87,20 +135,54 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - public async Task DeleteAsync(QueryOptions options) where T : IoTEntity + public async Task DeleteAsync(IoTDBQueryOptions options) where T : IoTEntity { - var query = BuildDeleteSQL(options); - var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - - if (!sessionDataSet.HasNext()) + try { - _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); - return 0; - } + var query = await BuildDeleteSQL(options); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - //获取唯一结果行 - var row = sessionDataSet.Next(); - return row.Values[0]; + if (!sessionDataSet.HasNext()) + { + _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); + return 0; + } + + //获取唯一结果行 + var row = sessionDataSet.Next(); + return row.Values[0]; + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常"); + throw; + } + } + + /// + /// 获取设备元数据 + /// + /// + /// + public async Task GetMetadata() where T : IoTEntity + { + var columns = CollectColumnMetadata(typeof(T)); + var metadata = BuildDeviceMetadata(columns); + var metaData = MetadataCache.AddOrUpdate( + typeof(T), + addValueFactory: t => metadata, // 如果键不存在,用此值添加 + updateValueFactory: (t, existingValue) => + { + var columns = CollectColumnMetadata(t); + var metadata = BuildDeviceMetadata(columns); + + //对现有值 existingValue 进行修改,返回新值 + existingValue.ColumnNames = metadata.ColumnNames; + return existingValue; + } + ); + + return await Task.FromResult(metaData); } /// @@ -109,18 +191,32 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() + public async Task> QueryAsync(IoTDBQueryOptions options) where T : IoTEntity, new() { - var query = BuildQuerySQL(options); - var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - - var result = new BusPagedResult + try { - TotalCount = await GetTotalCount(options), - Items = ParseResults(sessionDataSet, options.PageSize) - }; + var query =await BuildQuerySQL(options); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - return result; + + var result = new BusPagedResult + { + TotalCount = await GetTotalCount(options), + Items = await ParseResults(sessionDataSet, options.PageSize), + PageIndex = options.PageIndex, + PageSize = options.PageSize, + + }; + + result.HasNext = result.TotalCount > 0? result.TotalCount < result.PageSize : false; + + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常"); + throw; + } } /// @@ -160,12 +256,12 @@ namespace JiShe.CollectBus.IoTDB.Provider } string tableNameOrTreePath = string.Empty; - var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); if (tableNameOrTreePathAttribute != null) { - tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; } - + foreach (var entity in entities) { timestamps.Add(entity.Timestamps); @@ -214,7 +310,23 @@ namespace JiShe.CollectBus.IoTDB.Provider else { - rowValues.Add(value); + //需要根据value的类型,进行相应的值映射转换,例如datetime转换为long的时间戳值 + if (value != null) + { + Type tupleType = value.GetType(); + var tempValue = tupleType.Name.ToUpper() switch + { + "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + _ => value + }; + + rowValues.Add(tempValue); + } + else + { + rowValues.Add(value); + } + } } @@ -237,7 +349,7 @@ namespace JiShe.CollectBus.IoTDB.Provider devicePaths.Add(DevicePathBuilder.GetTableName()); } } - + } if (devicePaths.Count > 1) @@ -258,8 +370,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, - List> values, List timestamps) + private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) { //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 @@ -280,8 +391,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, - List> values, List timestamps) + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) { var tablet = new Tablet( tableName, @@ -301,9 +411,9 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private string BuildQuerySQL(QueryOptions options) where T : IoTEntity + private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity { - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.ColumnNames); sb.Append($" FROM {options.TableNameOrTreePath}"); @@ -314,7 +424,7 @@ namespace JiShe.CollectBus.IoTDB.Provider sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } - sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}"); + sb.Append($" LIMIT {options.PageSize} OFFSET {options.PageIndex * options.PageSize}"); return sb.ToString(); } @@ -324,9 +434,9 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private string BuildDeleteSQL(QueryOptions options) where T : IoTEntity + private async Task BuildDeleteSQL(IoTDBQueryOptions options) where T : IoTEntity { - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var sb = new StringBuilder(); if (!_runtimeContext.UseTableSessionPool) @@ -361,10 +471,10 @@ namespace JiShe.CollectBus.IoTDB.Provider { return condition.Operator switch { - ">" => $"{condition.Field} > {condition.Value}", - "<" => $"{condition.Field} < {condition.Value}", - "=" => $"{condition.Field} = '{condition.Value}'", - _ => throw new NotSupportedException($"Operator {condition.Operator} not supported") + ">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}": $"{condition.Field} > '{condition.Value}'", + "<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'", + "=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'", + _ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况") }; } @@ -374,7 +484,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private async Task GetTotalCount(QueryOptions options) where T : IoTEntity + private async Task GetTotalCount(IoTDBQueryOptions options) where T : IoTEntity { var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}"; if (options.Conditions.Any()) @@ -393,10 +503,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private IEnumerable ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() + private async Task> ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() { var results = new List(); - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var properties = typeof(T).GetProperties(); @@ -408,16 +518,24 @@ namespace JiShe.CollectBus.IoTDB.Provider Timestamps = record.Timestamps }; - foreach (var measurement in metadata.ColumnNames) { - var value = record.Values; + int indexOf = metadata.ColumnNames.IndexOf(measurement); + var value = record.Values[indexOf]; var prop = properties.FirstOrDefault(p => p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); if (prop != null) { - typeof(T).GetProperty(measurement)?.SetValue(entity, value); + if (measurement.EndsWith("time")) + { + var tempValue = TimestampHelper.ConvertToDateTime(Convert.ToInt64(value), TimestampUnit.Nanoseconds); + typeof(T).GetProperty(measurement)?.SetValue(entity, value); + } + else + { + typeof(T).GetProperty(measurement)?.SetValue(entity, value); + } } } @@ -427,30 +545,6 @@ namespace JiShe.CollectBus.IoTDB.Provider return results; } - /// - /// 获取设备元数据 - /// - /// - /// - private DeviceMetadata GetMetadata() where T : IoTEntity - { - var columns = CollectColumnMetadata(typeof(T)); - var metadata = BuildDeviceMetadata(columns); - return MetadataCache.AddOrUpdate( - typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 - updateValueFactory: (t, existingValue) => - { - var columns = CollectColumnMetadata(t); - var metadata = BuildDeviceMetadata(columns); - - //对现有值 existingValue 进行修改,返回新值 - existingValue.ColumnNames = metadata.ColumnNames; - return existingValue; - } - ); - } - /// /// 获取设备元数据的列 /// @@ -639,7 +733,7 @@ namespace JiShe.CollectBus.IoTDB.Provider ["DOUBLE"] = TSDataType.DOUBLE, ["TEXT"] = TSDataType.TEXT, ["NULLTYPE"] = TSDataType.NONE, - ["TIMESTAMP"] = TSDataType.TIMESTAMP, + ["DATETIME"] = TSDataType.TIMESTAMP, ["DATE"] = TSDataType.DATE, ["BLOB"] = TSDataType.BLOB, ["DECIMAL"] = TSDataType.STRING, @@ -659,7 +753,7 @@ namespace JiShe.CollectBus.IoTDB.Provider ["DOUBLE"] = 0.0d, ["TEXT"] = string.Empty, ["NULLTYPE"] = null, - ["TIMESTAMP"] = null, + ["DATETIME"] = null, ["DATE"] = null, ["BLOB"] = null, ["DECIMAL"] = "0.0", diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index fcb0c02..eacf246 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -25,6 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider .SetNodeUrl(options.ClusterList) .SetUsername(options.UserName) .SetPassword(options.Password) + .SetZoneId(options.ZoneId) .SetFetchSize(options.FetchSize) .SetPoolSize(options.PoolSize) .Build(); diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index d22f356..137f5a8 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -25,6 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider .SetNodeUrls(options.ClusterList) .SetUsername(options.UserName) .SetPassword(options.Password) + .SetZoneId(options.ZoneId) .SetFetchSize(options.FetchSize) .SetPoolSize(options.PoolSize) .SetDatabase(options.DataBaseName) diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index 5f7bdf1..3bea5f1 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -57,5 +57,5 @@ public class KafkaOptionConfig /// /// 首次采集时间 /// - public DateTime FirstCollectionTime { get; set; } + public DateTime? FirstCollectionTime { get; set; } } \ No newline at end of file diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 7693636..86582af 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; @@ -30,12 +31,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS { private readonly ILogger _logger; private readonly IIoTDbProvider _iotDBProvider; - private readonly IoTDbRuntimeContext _dbContext; + private readonly IoTDBRuntimeContext _dbContext; private readonly IoTDbOptions _options; private readonly IRedisDataCacheService _redisDataCacheService; public SampleAppService(IIoTDbProvider iotDBProvider, IOptions options, - IoTDbRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) + IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) { _iotDBProvider = iotDBProvider; _options = options.Value; @@ -50,20 +51,20 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS /// /// [HttpGet] - public async Task UseSessionPool(DateTime testTime) - { + public async Task UseSessionPool(long testTime) + { ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel() { SystemName = "energy", - DeviceId = "402440506", + DeviceId = "402440506s", DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "messageHexString", - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = testTime// DateTimeOffset.UtcNow.ToUnixTimeNanoseconds()//testTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), }; await _iotDBProvider.InsertAsync(meter); } @@ -83,9 +84,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; await _iotDBProvider.InsertAsync(meter2); @@ -99,9 +100,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; await _iotDBProvider.InsertAsync(meter); @@ -122,10 +123,10 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "dsdfsfd", - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(), }; @@ -141,12 +142,17 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, Currentd = 22, IssuedMessageHexString = "dsdfsfd", - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; + + //var dd = DateTimeOffset.Now.ToUnixTimeMilliseconds(); + //var dd3 = DateTimeOffset.Now.ToUnixTimeMicroseconds(); + //var dd2 = DateTimeOffset.Now.ToUnixTimeNanoseconds(); + await _iotDBProvider.InsertAsync(meter3); } @@ -163,7 +169,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) }; @@ -183,7 +189,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) }; @@ -203,7 +209,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleColumn = new Tuple(measuring, value) }; @@ -224,7 +230,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleColumn = new Tuple(measuring, value) }; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 4ff5dd6..3437f4b 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,6 @@ -using DnsClient.Protocol; +using Confluent.Kafka; +using DnsClient.Protocol; +using FreeSql; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; @@ -9,7 +11,11 @@ using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; +using JiShe.CollectBus.IoTDB.Model; +using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; @@ -19,6 +25,8 @@ using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.Repository.MeterReadingRecord; using Mapster; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -38,22 +46,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; - private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; private readonly KafkaOptionConfig _kafkaOptions; + private readonly IoTDBRuntimeContext _runtimeContext; public BasicScheduledMeterReadingService( ILogger logger, - IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, IIoTDbProvider dbProvider, + IoTDBRuntimeContext runtimeContext, IOptions kafkaOptions) { _logger = logger; _dbProvider = dbProvider; - _meterReadingRecordRepository = meterReadingRecordRepository; + _runtimeContext = runtimeContext; _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; @@ -133,17 +141,23 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - //_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}"); - + //List pushTaskInfos = new(); + _runtimeContext.UseTableSessionPool = true; + var metadata = await _dbProvider.GetMetadata(); _ = CreateMeterPublishTask( timeDensity: timeDensity, - taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), meterType: MeterTypeEnum.Ammeter, - taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => + taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { - AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); + if (tempTask == null || tempTask.Count <= 0) + { + _logger.LogWarning($"{data.Name} 任务数据构建失败:{data.Serialize()}"); + return; + } + _dbProvider.BatchInsertAsync(metadata, tempTask); }); - } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -152,7 +166,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = CreateMeterPublishTask( timeDensity: timeDensity, - taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + nextTaskTime: tasksToBeIssueModel.NextTaskTime, meterType: MeterTypeEnum.Ammeter, taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => { @@ -169,6 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 + tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime; tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } @@ -193,11 +208,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { -#if DEBUG - return; - - - +#if DEBUG var timeDensity = "15"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; @@ -236,7 +247,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); - _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); + _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); //return; #else @@ -261,13 +272,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + if (_kafkaOptions.FirstCollectionTime.HasValue == false) + { + _kafkaOptions.FirstCollectionTime = DateTime.Now; + } //先处理采集频率任务缓存 foreach (var item in meterInfoGroupByTimeDensity) { TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { + LastTaskTime = null, TimeDensity = item.Key, - NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 @@ -395,13 +411,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => - { - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => + //{ + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); - }); + // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + //}); await Task.CompletedTask; @@ -426,13 +442,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => - { - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => + //{ + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); - }); + // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + //}); } /// @@ -443,8 +459,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading { //获取缓存中的电表信息 int timeDensity = 15; - var currentTime = DateTime.Now; + //var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity); + var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity); + var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity); + var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); + + if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); + return; + } + // 自动计算最佳并发度 int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); @@ -452,73 +478,81 @@ namespace JiShe.CollectBus.ScheduledMeterReading { MaxDegreeOfParallelism = recommendedThreads, }; - var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; + var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); - Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => + var conditions = new List(); + conditions.Add(new QueryCondition() { - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - - _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + Field = "PendingCopyReadTime", + Operator = "=", + IsNumber = true, + Value = pendingCopyReadTime }); + _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + { + TableNameOrTreePath = DevicePathBuilder.GetTableName(), + PageIndex = 1, + PageSize = 3000, + Conditions = conditions, + }); } - /// - /// 创建电表待发送的任务数据 - /// - /// 采集频率 - /// 时间格式的任务批次名称 - /// - private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) - { - var timer = Stopwatch.StartNew(); + ///// + ///// 创建电表待发送的任务数据 + ///// + ///// 采集频率 + ///// 时间格式的任务批次名称 + ///// + //private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) + //{ + // var timer = Stopwatch.StartNew(); - //获取对应频率中的所有电表信息 - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // //获取对应频率中的所有电表信息 + // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + // List meterInfos = new List(); + // decimal? cursor = null; + // string member = null; + // bool hasNext; + // do + // { + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + // } while (hasNext); - if (meterInfos == null || meterInfos.Count <= 0) - { - timer.Stop(); - _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); - return; - } + // if (meterInfos == null || meterInfos.Count <= 0) + // { + // timer.Stop(); + // _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + // return; + // } - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); - } - ); + // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + // items: meterInfos, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, groupIndex) => + // { + // AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + // } + // ); - timer.Stop(); - _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); - } + // timer.Stop(); + // _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + //} /// @@ -527,38 +561,33 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 采集频率 /// 电表信息 /// 集中器所在分组 - /// 时间格式的任务批次名称 + /// 采集频率对应的时间戳 /// - private void AmmerterCreatePublishTaskAction(int timeDensity - , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) + private List AmmerterCreatePublishTaskAction(int timeDensity + , AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) { + var currentTime = DateTime.Now; + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? - var currentTime = DateTime.Now; - var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); - return; + return null; } //载波的不处理 if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); - return; + return null; } if (ammeterInfo.State.Equals(2)) { //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); - return; + return null; } ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 @@ -571,22 +600,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) { // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); - return; + return null; } if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); - return; + return null; } if (Convert.ToInt32(ammeterInfo.Address) > 65535) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); - return; + return null; } if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); - return; + return null; } List tempCodes = ammeterInfo.ItemCodes.Deserialize>()!; @@ -613,7 +642,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (tempSubCodes == null || tempSubCodes.Count <= 0) { //_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); - return; + return null; } else { @@ -683,18 +712,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { - ProjectID = ammeterInfo.ProjectID, + SystemName = SystemType, + ProjectId = $"{ammeterInfo.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{ammeterInfo.FocusAddress}", + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), DatabaseBusiID = ammeterInfo.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, + PendingCopyReadTime = timestamps, CreationTime = currentTime, MeterAddress = ammeterInfo.AmmerterAddress, - MeterId = ammeterInfo.MeterId, - MeterType = MeterTypeEnum.Ammeter, - FocusAddress = ammeterInfo.FocusAddress, - FocusId = ammeterInfo.FocusId, - AFN = aFN, + AFN = (int)aFN, Fn = fn, - Seq = builderResponse.Seq, + //Seq = builderResponse.Seq, MSA = builderResponse.MSA, ItemCode = tempItem, TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA), @@ -709,37 +738,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading taskList.Add(meterReadingRecords); } - if (taskList == null - || taskList.Count() <= 0 - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); - return; - } - - using (var pipe = FreeRedisProvider.Instance.StartPipe()) - { - foreach (var item in taskList) - { - // 主数据存储Hash - pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize()); - - // Set索引缓存 - pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId); - - // ZSET索引缓存Key - pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId); - } - pipe.EndPipe(); - } - - //await _redisDataCacheService.BatchInsertDataAsync( - // redisCacheTelemetryPacketInfoHashKey, - // redisCacheTelemetryPacketInfoSetIndexKey, - // redisCacheTelemetryPacketInfoZSetScoresIndexKey, - // taskList); + return taskList; } #endregion @@ -864,7 +863,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } ////删除任务数据 @@ -877,52 +876,52 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } - /// - /// 创建水表待发送的任务数据 - /// - /// 采集频率 - /// 水表信息 - /// 集中器所在分组 - /// 时间格式的任务批次名称 - /// - private void WatermeterCreatePublishTaskAction(int timeDensity - , WatermeterInfo meterInfo, int groupIndex, string taskBatch) - { - var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + ///// + ///// 创建水表待发送的任务数据 + ///// + ///// 采集频率 + ///// 水表信息 + ///// 集中器所在分组 + ///// 时间格式的任务批次名称 + ///// + //private void WatermeterCreatePublishTaskAction(int timeDensity + // , WatermeterInfo meterInfo, int groupIndex, string taskBatch) + //{ + // var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; - var currentTime = DateTime.Now; - var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + // var currentTime = DateTime.Now; + // var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var taskInfo = new MeterReadingTelemetryPacketInfo() - { - Seq= null, - - }; - // + // var taskInfo = new MeterReadingTelemetryPacketInfo() + // { + // Seq= null, - Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); + // }; + // // - using (var pipe = FreeRedisProvider.Instance.StartPipe()) - { - // 主数据存储Hash - pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); + // Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); - // Set索引缓存 - pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); + // using (var pipe = FreeRedisProvider.Instance.StartPipe()) + // { + // // 主数据存储Hash + // pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); - // ZSET索引缓存Key - pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); + // // Set索引缓存 + // pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); - pipe.EndPipe(); - } + // // ZSET索引缓存Key + // pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); - } + // pipe.EndPipe(); + // } + + //} #endregion @@ -961,11 +960,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 创建表的待发送的任务数据 /// /// 采集频率 - /// 时间格式的任务批次名称 + /// 采集频率对应的任务时间戳 /// 表类型 /// 具体的创建任务的委托 /// - private async Task CreateMeterPublishTask(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel + private async Task CreateMeterPublishTask(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel { var timer = Stopwatch.StartNew(); @@ -978,20 +977,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading decimal? cursor = null; string member = null; bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + //} while (hasNext); + + + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 10, + lastScore: cursor, + lastMember: member); + meterInfos.AddRange(page.Items); if (meterInfos == null || meterInfos.Count <= 0) { @@ -1000,56 +1008,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, processor: (data, groupIndex) => { - taskCreateAction(timeDensity, data, groupIndex, taskBatch); + taskCreateAction(timeDensity, data, groupIndex, nextTaskTime); } ); timer.Stop(); - _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息"); } /// /// 创建Kafka消息 /// - /// - /// /// - private async Task CreateMeterKafkaTaskMessage( - string redisCacheTelemetryPacketInfoHashKey, - string redisCacheTelemetryPacketInfoZSetScoresIndexKey) + private async Task CreateMeterKafkaTaskMessage(int timeDensity, IoTDBQueryOptions options) where T : IoTEntity, new() { - if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)) - { - throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101"); - } - - decimal? cursor = null; - string member = null; + int pageNumber = 0; bool hasNext; var stopwatch = Stopwatch.StartNew(); do { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + options.PageIndex = pageNumber++; - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; + var pageResult = await _dbProvider.QueryAsync(options); - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: page.Items, - deviceIdSelector: data => data.FocusAddress, + hasNext = pageResult.HasNext; + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: pageResult.Items.ToList(), + deviceIdSelector: data => data.DeviceId, processor: (data, groupIndex) => { _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); @@ -1059,9 +1051,57 @@ namespace JiShe.CollectBus.ScheduledMeterReading } while (hasNext); stopwatch.Stop(); - _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } + + ///// + ///// 创建Kafka消息 + ///// + ///// + ///// + ///// + //private async Task CreateMeterKafkaTaskMessage( + //string redisCacheTelemetryPacketInfoHashKey, + //string redisCacheTelemetryPacketInfoZSetScoresIndexKey) + //{ + // if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)) + // { + // throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101"); + // } + + // decimal? cursor = null; + // string member = null; + // bool hasNext; + // var stopwatch = Stopwatch.StartNew(); + // do + // { + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheTelemetryPacketInfoHashKey, + // redisCacheTelemetryPacketInfoZSetScoresIndexKey, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); + + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + + // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + // items: page.Items, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, groupIndex) => + // { + // _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + // } + // ); + + // } while (hasNext); + + // stopwatch.Stop(); + // _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + //} + /// /// Kafka 推送消息 /// @@ -1069,15 +1109,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 任务记录 /// 对应分区,也就是集中器号所在的分组序号 /// - private async Task KafkaProducerIssuedMessageAction(string topicName, - MeterReadingTelemetryPacketInfo taskRecord, int partition) + private async Task KafkaProducerIssuedMessageAction(string topicName, + T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } - await _producerService.ProduceAsync(topicName, partition, taskRecord); + await _producerService.ProduceAsync(topicName, taskRecord, partition); } #endregion diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index b8fd08b..fe0746f 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -8,6 +8,7 @@ using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; @@ -35,18 +36,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { string serverTagName = string.Empty; + public EnergySystemScheduledMeterReadingService( ILogger logger, IIoTDbProvider dbProvider, - IMeterReadingRecordRepository meterReadingRecordRepository, - IOptions kafkaOptions, + IOptions kafkaOptions, + IoTDBRuntimeContext runtimeContext, IProducerService producerService, IRedisDataCacheService redisDataCacheService) : base(logger, - meterReadingRecordRepository, producerService, redisDataCacheService, dbProvider, + runtimeContext, kafkaOptions) { serverTagName = kafkaOptions.Value.ServerTagName; diff --git a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index c07950f..add3131 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Ammeters /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 /// [Column(IsIgnore = true)] - public override string MemberId => $"{FocusId}:{MeterId}"; + public override string MemberId => $"{FocusAddress}:{MeteringCode}"; /// /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index 3aafa41..3ac4202 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -1,5 +1,9 @@ -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Encrypt; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Enums; +using JiShe.CollectBus.IoTDB.Model; using System; using System.Collections.Generic; using System.Linq; @@ -13,78 +17,79 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 抄读任务Redis缓存数据记录 /// - public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel + [EntityType(EntityTypeEnum.TableModel)] + public class MeterReadingTelemetryPacketInfo : IoTEntity { /// - /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// 排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳、或者某一个固定的标识1 /// - public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}"; - - /// - /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 - /// - public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; - - + [FIELDColumn] + public string ScoreValue + { + get + { + return $"{DeviceId}.{TaskMark}".Md5Fun(); + } + } + /// /// 是否手动操作 /// + [FIELDColumn] public bool ManualOrNot { get; set; } /// /// 任务数据唯一标记 /// - public decimal TaskMark { get; set; } - - /// - /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。 - /// - public long Timestamps { get; set; } + [FIELDColumn] + public string TaskMark { get; set; } /// /// 是否超时 /// + [FIELDColumn] public bool IsTimeout { get; set; } = false; /// /// 待抄读时间 /// + [FIELDColumn] public DateTime PendingCopyReadTime { get; set; } - - + /// - /// 集中器地址 + /// 集中器Id /// - public string FocusAddress { get; set; } - + [FIELDColumn] + public int FocusId { get; set; } + + /// + /// 表Id + /// + [FIELDColumn] + public int MeterId { get; set; } + /// /// 表地址 /// + [FIELDColumn] public string MeterAddress { get; set; } - /// - /// 表类型 - /// - public MeterTypeEnum MeterType { get; set; } - - /// - /// 项目ID - /// - public int ProjectID { get; set; } - /// /// 数据库业务ID /// + [FIELDColumn] public int DatabaseBusiID { get; set; } /// /// AFN功能码 /// - public AFN AFN { get; set; } + [FIELDColumn] + public int AFN { get; set; } /// /// 抄读功能码 /// + [FIELDColumn] public int Fn { get; set; } /// @@ -95,66 +100,73 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 采集项编码 /// - public string ItemCode { get; set;} + [FIELDColumn] + public string ItemCode { get; set; } - /// - /// 帧序列域SEQ - /// - public required Seq Seq { get; set; } + ///// + ///// 帧序列域SEQ + ///// + //public required Seq Seq { get; set; } /// /// 地址域A3的主站地址MSA /// + [FIELDColumn] public int MSA { get; set; } /// /// 是否发送 /// + [FIELDColumn] public bool IsSend { get; set; } /// /// 创建时间 /// + [FIELDColumn] public DateTime CreationTime { get; set; } /// /// 下发消息内容 /// + [FIELDColumn] public string IssuedMessageHexString { get; set; } /// /// 下发消息Id /// + [FIELDColumn] public string IssuedMessageId { get; set; } /// /// 消息上报内容 /// + [FIELDColumn] public string? ReceivedMessageHexString { get; set; } /// /// 消息上报时间 /// + [FIELDColumn] public DateTime? ReceivedTime { get; set; } /// /// 上报消息Id /// - public string ReceivedMessageId { get; set; } + [FIELDColumn] + public string ReceivedMessageId { get; set; } /// /// 上报报文解析备注,异常情况下才有 /// + [FIELDColumn] public string ReceivedRemark { get; set; } /// /// 是否已上报 /// - public bool IsReceived { get; set; } - - //public void CreateDataId(Guid Id) - //{ - // this.Id = Id; - //} + [FIELDColumn] + public bool IsReceived { get; set; } + } } diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs index 5184459..6420a23 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs @@ -11,6 +11,11 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// public class TasksToBeIssueModel { + /// + /// 上次下发任务的时间 + /// + public DateTime? LastTaskTime { get; set; } + /// /// 下个任务时间 /// diff --git a/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs b/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs new file mode 100644 index 0000000..72cdf41 --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Encrypt +{ + /// + /// 各种加密辅助类 + /// + public static class EncryptUtil + { + #region MD5加密 + + /// + /// MD5加密 + /// + public static string Md5Fun(this string value) + { + if (value == null) + { + throw new ArgumentNullException("未将对象引用设置到对象的实例。"); + } + + var encoding = Encoding.UTF8; + MD5 md5 = MD5.Create(); + return HashAlgorithmBase(md5, value, encoding); + } + + /// + /// 加权MD5加密 + /// + public static string Md5Fun(this string value, string salt) + { + return salt == null ? value.Md5Fun() : (value + "『" + salt + "』").Md5Fun(); + } + + #endregion + + /// + /// HashAlgorithm 加密统一方法 + /// + private static string HashAlgorithmBase(HashAlgorithm hashAlgorithmObj, string source, Encoding encoding) + { + byte[] btStr = encoding.GetBytes(source); + byte[] hashStr = hashAlgorithmObj.ComputeHash(btStr); + return hashStr.Bytes2Str(); + } + + /// + /// 转换成字符串 + /// + private static string Bytes2Str(this IEnumerable source, string formatStr = "{0:X2}") + { + StringBuilder pwd = new StringBuilder(); + foreach (byte btStr in source) + { + pwd.AppendFormat(formatStr, btStr); + } + return pwd.ToString(); + } + } +} diff --git a/shared/JiShe.CollectBus.Common/Enums/TimestampUnit.cs b/shared/JiShe.CollectBus.Common/Enums/TimestampUnit.cs new file mode 100644 index 0000000..9bed83f --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Enums/TimestampUnit.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Enums +{ + public enum TimestampUnit + { + Seconds, // 秒级(Unix 时间戳) + Milliseconds, // 毫秒级(默认) + Microseconds, // 微秒级 + Nanoseconds // 纳秒级 + } +} diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 2bdcf6c..6936ad3 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -181,25 +181,7 @@ namespace JiShe.CollectBus.Common.Extensions return $"{dateTime:yyyyMMddHH}"; #endif } - - /// - /// 获取当前时间毫秒级时间戳 - /// - /// - public static long GetCurrentTimeMillis() - { - return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - } - - /// - /// 将Unix时间戳转换为日期时间 - /// - /// - /// - public static DateTime FromUnixMillis(long millis) - { - return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; - } + /// /// 采集时间节点计算 @@ -236,19 +218,40 @@ namespace JiShe.CollectBus.Common.Extensions /// - /// 将 DateTime 时间转换为 DateTimeOffset 时间 + /// 格式化为微秒(μs) /// - /// + /// /// - public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime) + public static string ToMicrosecondString(this DateTime dt) { - //确保 Kind 为 Local(如果是 Unspecified) - DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified - ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local) - : rawDateTime; + long microseconds = (dt.Ticks % TimeSpan.TicksPerSecond) / 10; // 1 Tick = 100ns → 0.1μs + return $"{dt:yyyy-MM-dd HH:mm:ss.fffffff}".Remove(23) + $"{microseconds:D6}"; + } - // 转换为 DateTimeOffset(自动应用本地时区偏移) - return new DateTimeOffset(localDateTime); + /// + /// 格式化为纳秒(ns) + /// + /// + /// + public static string ToNanosecondString(this DateTime dt) + { + long nanoseconds = (dt.Ticks % TimeSpan.TicksPerSecond) * 100; // 1 Tick = 100ns + return $"{dt:yyyy-MM-dd HH:mm:ss.fffffff}".Remove(23) + $"{nanoseconds:D9}"; + } + + /// + /// 毫米、微秒、纳秒时间戳转DateTime + /// + /// + /// + /// + public static DateTime ParseIntToDate(long dateLong) + { + if (dateLong < 10000101 || dateLong > 99991231) + { + throw new ArgumentException("Date must be between 10000101 and 99991231."); + } + return DateTime.TryParseExact(dateLong.ToString(), "yyyyMMdd HHmmssZZ", null, System.Globalization.DateTimeStyles.None, out DateTime date) ? date : throw new ArgumentException("Date must be between 10000101 and 99991231."); } } } diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs new file mode 100644 index 0000000..0f07e9c --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; + +namespace JiShe.CollectBus.Common.Extensions +{ + public static class DateTimeOffsetExtensions + { + + /// + /// 获取当前时间毫秒级时间戳 + /// + /// + public static long GetCurrentTimeMillis() + { + return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + /// + /// 将Unix时间戳转换为日期时间 + /// + /// + /// + public static DateTime FromUnixMillis(long millis) + { + return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; + } + + /// + /// 将 DateTime 时间转换为 DateTimeOffset 时间 + /// + /// + /// + public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime) + { + //确保 Kind 为 Local(如果是 Unspecified) + DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified + ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local) + : rawDateTime; + + // 转换为 DateTimeOffset(自动应用本地时区偏移) + return new DateTimeOffset(localDateTime); + } + } +} diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 3c36d23..34cf37f 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -769,11 +769,11 @@ namespace JiShe.CollectBus.Common.Helpers /// /// /// - public static decimal GetTaskMark(int afn, int fn, int pn, int msa) + public static string GetTaskMark(int afn, int fn, int pn, int msa) { var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}"; - return Convert.ToInt32(makstr) << 32 | msa; + return makstr;// Convert.ToInt32(makstr) << 32 | msa; } } } diff --git a/shared/JiShe.CollectBus.Common/Helpers/TimestampHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/TimestampHelper.cs new file mode 100644 index 0000000..9edc3cd --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Helpers/TimestampHelper.cs @@ -0,0 +1,68 @@ +using JiShe.CollectBus.Common.Enums; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Helpers +{ + /// + /// 时间戳帮助类 + /// + public static class TimestampHelper + { + private static readonly long UnixEpochTicks = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero).Ticks; + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的微秒数 + /// + public static long ToUnixTimeMicroseconds(this DateTimeOffset dateTimeOffset) + { + // Ticks 单位是 100 纳秒,转换为微秒需除以 10 + long elapsedTicks = dateTimeOffset.Ticks - UnixEpochTicks; + return elapsedTicks / 10; // 1 微秒 = 1000 纳秒 = 10 Ticks + } + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的纳秒数 + /// + public static long ToUnixTimeNanoseconds(this DateTimeOffset dateTimeOffset) + { + long nanoseconds = (dateTimeOffset.Ticks - UnixEpochTicks) * 100; + return nanoseconds; + } + + /// + /// 将 long 类型时间戳转换为 DateTime(UTC) + /// + /// 时间戳数值 + /// 时间戳单位 + public static DateTime ConvertToDateTime(long timestamp, TimestampUnit unit = TimestampUnit.Milliseconds) + { + long ticks = unit switch + { + TimestampUnit.Seconds => checked(timestamp * TimeSpan.TicksPerSecond), + TimestampUnit.Milliseconds => checked(timestamp * TimeSpan.TicksPerMillisecond), + TimestampUnit.Microseconds => checked(timestamp * 10), // 1微秒 = 10 Ticks(100纳秒) + TimestampUnit.Nanoseconds => checked(timestamp / 100),// 1 Tick = 100纳秒 + _ => throw new ArgumentException("无效的时间单位", nameof(unit)) + }; + + try + { + DateTime result = new DateTime(UnixEpochTicks + ticks, DateTimeKind.Utc); + // 校验结果是否在 DateTime 合法范围内(0001-01-01 至 9999-12-31) + if (result < DateTime.MinValue || result > DateTime.MaxValue) + { + throw new ArgumentOutOfRangeException(nameof(timestamp), "时间戳超出 DateTime 范围"); + } + return result; + } + catch (ArgumentOutOfRangeException ex) + { + throw new ArgumentOutOfRangeException("时间戳无效", ex); + } + } + } +} diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index f7bd68e..0193df3 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -18,7 +18,7 @@ 后端服务 - +