diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs index b896bdf..82a0d47 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.IoTDB.Provider; namespace JiShe.CollectBus.IoTDB.Interface { @@ -31,6 +32,15 @@ namespace JiShe.CollectBus.IoTDB.Interface /// Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity; + /// + /// 批量插入数据 + /// + /// + /// 设备元数据 + /// + /// + Task BatchInsertAsync(DeviceMetadata deviceMetadata,IEnumerable entities) where T : IoTEntity; + /// /// 删除数据 @@ -38,7 +48,14 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// /// - Task DeleteAsync(QueryOptions options) where T : IoTEntity; + Task DeleteAsync(IoTDBQueryOptions options) where T : IoTEntity; + + /// + /// 获取设备元数据 + /// + /// + /// + Task GetMetadata() where T : IoTEntity; /// /// 查询数据 @@ -46,6 +63,6 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// /// - Task> QueryAsync(QueryOptions options) where T : IoTEntity, new(); + Task> QueryAsync(IoTDBQueryOptions options) where T : IoTEntity, new(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index f3cdbe9..92f98e1 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -11,34 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Model /// 系统名称 /// [TAGColumn] - public required string SystemName { get; set; } + public string SystemName { get; set; } /// /// 项目编码 /// - [TAGColumn] - public required string ProjectId { get; set; } + [ATTRIBUTEColumn] + public string ProjectId { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 /// - [TAGColumn] - public required string DeviceType { get; set; } + [ATTRIBUTEColumn] + public string DeviceType { get; set; } /// - /// 设备ID + /// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址 /// [TAGColumn] - public required string DeviceId { get; set; } + public string DeviceId { get; set; } /// /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// - public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - - /// - /// 数据创建时间戳,单位毫秒,必须通过DateTimeOffset获取 - /// - public virtual long CreationTime { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs index 69463ba..0d01f81 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs @@ -42,5 +42,10 @@ /// 是否使用表模型存储, 默认false,使用tree模型存储 /// public bool UseTableSessionPoolByDefault { get; set; } = false; + + /// + /// 时区,默认为:"UTC+08:00" + /// + public string ZoneId { get; set; } = "UTC+08:00"; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Options/QueryOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBQueryOptions.cs similarity index 78% rename from modules/JiShe.CollectBus.IoTDB/Options/QueryOptions.cs rename to modules/JiShe.CollectBus.IoTDB/Options/IoTDBQueryOptions.cs index 90e44b2..bff4641 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/QueryOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBQueryOptions.cs @@ -3,7 +3,7 @@ /// /// 查询条件 /// - public class QueryOptions + public class IoTDBQueryOptions { /// /// 表模型的表名称或者树模型的设备路径 @@ -13,7 +13,7 @@ /// /// 分页 /// - public int Page { get; set; } + public int PageIndex { get; set; } /// /// 分页大小 @@ -23,6 +23,6 @@ /// /// 查询条件 /// - public List Conditions { get; } = new(); + public List Conditions { get; set; } = new(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs index cf6d3a9..40dd443 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/QueryCondition.cs @@ -9,10 +9,17 @@ /// 字段 /// public string Field { get; set; } + /// - /// 操作符 + /// 操作符,>,=,< /// public string Operator { get; set; } + + /// + /// 是否数值,如果是数值,则进行数值比较,否则进行字符串比较 + /// + public bool IsNumber { get; set; } = false; + /// /// 值 /// diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index 6a1a596..6922c62 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; } @@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDeviceTableName(T entity) where T : IoTEntity { - return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index b9c2cf0..7cfaf32 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -2,8 +2,12 @@ using System.Collections.Concurrent; using System.Reflection; using System.Text; +using System.Threading.Tasks; using Apache.IoTDB; using Apache.IoTDB.DataStructure; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Context; @@ -56,7 +60,7 @@ namespace JiShe.CollectBus.IoTDB.Provider { try { - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); @@ -78,7 +82,7 @@ namespace JiShe.CollectBus.IoTDB.Provider { try { - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var batchSize = 1000; var batches = entities.Chunk(batchSize); @@ -96,6 +100,34 @@ namespace JiShe.CollectBus.IoTDB.Provider } } + /// + /// 批量插入数据 + /// + /// + /// 设备元数据 + /// + /// + public async Task BatchInsertAsync(DeviceMetadata deviceMetadata, IEnumerable entities) where T : IoTEntity + { + try + { + + var batchSize = 1000; + var batches = entities.Chunk(batchSize); + + foreach (var batch in batches) + { + var tablet = BuildTablet(batch, deviceMetadata); + await CurrentSession.InsertAsync(tablet); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常"); + throw; + } + } + /// /// 删除数据 @@ -103,11 +135,11 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - public async Task DeleteAsync(QueryOptions options) where T : IoTEntity + public async Task DeleteAsync(IoTDBQueryOptions options) where T : IoTEntity { try { - var query = BuildDeleteSQL(options); + var query = await BuildDeleteSQL(options); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); if (!sessionDataSet.HasNext()) @@ -127,30 +159,62 @@ namespace JiShe.CollectBus.IoTDB.Provider } } + /// + /// 获取设备元数据 + /// + /// + /// + public async Task GetMetadata() where T : IoTEntity + { + var columns = CollectColumnMetadata(typeof(T)); + var metadata = BuildDeviceMetadata(columns); + var metaData = MetadataCache.AddOrUpdate( + typeof(T), + addValueFactory: t => metadata, // 如果键不存在,用此值添加 + updateValueFactory: (t, existingValue) => + { + var columns = CollectColumnMetadata(t); + var metadata = BuildDeviceMetadata(columns); + + //对现有值 existingValue 进行修改,返回新值 + existingValue.ColumnNames = metadata.ColumnNames; + return existingValue; + } + ); + + return await Task.FromResult(metaData); + } + /// /// 查询数据 /// /// /// /// - public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() + public async Task> QueryAsync(IoTDBQueryOptions options) where T : IoTEntity, new() { try { - var query = BuildQuerySQL(options); + var query =await BuildQuerySQL(options); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); + var result = new BusPagedResult { TotalCount = await GetTotalCount(options), - Items = ParseResults(sessionDataSet, options.PageSize) + Items = await ParseResults(sessionDataSet, options.PageSize), + PageIndex = options.PageIndex, + PageSize = options.PageSize, + }; + result.HasNext = result.TotalCount > 0? result.TotalCount < result.PageSize : false; + return result; } catch (Exception ex) { - _logger.LogError(ex, $"{nameof(QueryAsync)} 查询数据时发生异常"); + _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常"); throw; } } @@ -246,7 +310,23 @@ namespace JiShe.CollectBus.IoTDB.Provider else { - rowValues.Add(value); + //需要根据value的类型,进行相应的值映射转换,例如datetime转换为long的时间戳值 + if (value != null) + { + Type tupleType = value.GetType(); + var tempValue = tupleType.Name.ToUpper() switch + { + "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + _ => value + }; + + rowValues.Add(tempValue); + } + else + { + rowValues.Add(value); + } + } } @@ -331,9 +411,9 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private string BuildQuerySQL(QueryOptions options) where T : IoTEntity + private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity { - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.ColumnNames); sb.Append($" FROM {options.TableNameOrTreePath}"); @@ -344,7 +424,7 @@ namespace JiShe.CollectBus.IoTDB.Provider sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } - sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}"); + sb.Append($" LIMIT {options.PageSize} OFFSET {options.PageIndex * options.PageSize}"); return sb.ToString(); } @@ -354,9 +434,9 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private string BuildDeleteSQL(QueryOptions options) where T : IoTEntity + private async Task BuildDeleteSQL(IoTDBQueryOptions options) where T : IoTEntity { - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var sb = new StringBuilder(); if (!_runtimeContext.UseTableSessionPool) @@ -391,10 +471,10 @@ namespace JiShe.CollectBus.IoTDB.Provider { return condition.Operator switch { - ">" => $"{condition.Field} > {condition.Value}", - "<" => $"{condition.Field} < {condition.Value}", - "=" => $"{condition.Field} = '{condition.Value}'", - _ => throw new NotSupportedException($"Operator {condition.Operator} not supported") + ">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}": $"{condition.Field} > '{condition.Value}'", + "<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'", + "=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'", + _ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况") }; } @@ -404,7 +484,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private async Task GetTotalCount(QueryOptions options) where T : IoTEntity + private async Task GetTotalCount(IoTDBQueryOptions options) where T : IoTEntity { var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}"; if (options.Conditions.Any()) @@ -423,10 +503,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// /// - private IEnumerable ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() + private async Task> ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() { var results = new List(); - var metadata = GetMetadata(); + var metadata = await GetMetadata(); var properties = typeof(T).GetProperties(); @@ -438,16 +518,24 @@ namespace JiShe.CollectBus.IoTDB.Provider Timestamps = record.Timestamps }; - foreach (var measurement in metadata.ColumnNames) { - var value = record.Values; + int indexOf = metadata.ColumnNames.IndexOf(measurement); + var value = record.Values[indexOf]; var prop = properties.FirstOrDefault(p => p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); if (prop != null) { - typeof(T).GetProperty(measurement)?.SetValue(entity, value); + if (measurement.EndsWith("time")) + { + var tempValue = TimestampHelper.ConvertToDateTime(Convert.ToInt64(value), TimestampUnit.Nanoseconds); + typeof(T).GetProperty(measurement)?.SetValue(entity, value); + } + else + { + typeof(T).GetProperty(measurement)?.SetValue(entity, value); + } } } @@ -457,30 +545,6 @@ namespace JiShe.CollectBus.IoTDB.Provider return results; } - /// - /// 获取设备元数据 - /// - /// - /// - private DeviceMetadata GetMetadata() where T : IoTEntity - { - var columns = CollectColumnMetadata(typeof(T)); - var metadata = BuildDeviceMetadata(columns); - return MetadataCache.AddOrUpdate( - typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 - updateValueFactory: (t, existingValue) => - { - var columns = CollectColumnMetadata(t); - var metadata = BuildDeviceMetadata(columns); - - //对现有值 existingValue 进行修改,返回新值 - existingValue.ColumnNames = metadata.ColumnNames; - return existingValue; - } - ); - } - /// /// 获取设备元数据的列 /// diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index fcb0c02..eacf246 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -25,6 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider .SetNodeUrl(options.ClusterList) .SetUsername(options.UserName) .SetPassword(options.Password) + .SetZoneId(options.ZoneId) .SetFetchSize(options.FetchSize) .SetPoolSize(options.PoolSize) .Build(); diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index d22f356..137f5a8 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -25,6 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider .SetNodeUrls(options.ClusterList) .SetUsername(options.UserName) .SetPassword(options.Password) + .SetZoneId(options.ZoneId) .SetFetchSize(options.FetchSize) .SetPoolSize(options.PoolSize) .SetDatabase(options.DataBaseName) diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index b616544..86582af 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; @@ -50,20 +51,20 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS /// /// [HttpGet] - public async Task UseSessionPool(DateTime testTime) - { + public async Task UseSessionPool(long testTime) + { ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel() { SystemName = "energy", - DeviceId = "402440506", + DeviceId = "402440506s", DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "messageHexString", - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = testTime// DateTimeOffset.UtcNow.ToUnixTimeNanoseconds()//testTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), }; await _iotDBProvider.InsertAsync(meter); } @@ -85,7 +86,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectId = "10059", Voltage = 10, - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; await _iotDBProvider.InsertAsync(meter2); @@ -101,7 +102,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectId = "10059", Voltage = 10, - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; await _iotDBProvider.InsertAsync(meter); @@ -125,7 +126,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "dsdfsfd", - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(), }; @@ -145,8 +146,13 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS Voltage = 10, Currentd = 22, IssuedMessageHexString = "dsdfsfd", - Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), }; + + //var dd = DateTimeOffset.Now.ToUnixTimeMilliseconds(); + //var dd3 = DateTimeOffset.Now.ToUnixTimeMicroseconds(); + //var dd2 = DateTimeOffset.Now.ToUnixTimeNanoseconds(); + await _iotDBProvider.InsertAsync(meter3); } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 0fb4c49..3437f4b 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -13,6 +13,9 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; +using JiShe.CollectBus.IoTDB.Model; +using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; @@ -138,9 +141,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - List pushTaskInfos = new(); - - await CreateMeterPublishTask( + //List pushTaskInfos = new(); + _runtimeContext.UseTableSessionPool = true; + var metadata = await _dbProvider.GetMetadata(); + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), meterType: MeterTypeEnum.Ammeter, @@ -149,38 +153,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { + _logger.LogWarning($"{data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - - pushTaskInfos.AddRange(tempTask); - - //using (var score = _serviceProvider.CreateScope()) - //{ - // var _dbContext = score.ServiceProvider.GetRequiredService(); - // _dbContext.UseTableSessionPool = true; - // _dbProvider.BatchInsertAsync(tempTask); - //} - - _runtimeContext.UseTableSessionPool = true; - _dbProvider.BatchInsertAsync(tempTask); + _dbProvider.BatchInsertAsync(metadata, tempTask); }); - - //if (pushTaskInfos.Count <= 0) - //{ - // _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有任务数据信息,-1051"); - // continue; - //} - - //using (var score = _serviceProvider.CreateScope()) - //{ - // var _dbContext = score.ServiceProvider.GetRequiredService(); - // _dbContext.UseTableSessionPool = true; - // _dbProvider.BatchInsertAsync(pushTaskInfos); - //} - - //_dbContext.UseTableSessionPool = true; - //await _dbProvider.BatchInsertAsync(pushTaskInfos); - } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -206,6 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 + tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime; tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } @@ -269,9 +247,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); - _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - return; + _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + //return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -303,6 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { + LastTaskTime = null, TimeDensity = item.Key, NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; @@ -432,13 +411,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => - { - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => + //{ + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); - }); + // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + //}); await Task.CompletedTask; @@ -463,13 +442,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading }; var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => - { - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => + //{ + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); - }); + // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + //}); } /// @@ -480,8 +459,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading { //获取缓存中的电表信息 int timeDensity = 15; - var currentTime = DateTime.Now; + //var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity); + var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity); + var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity); + var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); + + if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); + return; + } + // 自动计算最佳并发度 int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); @@ -489,16 +478,24 @@ namespace JiShe.CollectBus.ScheduledMeterReading { MaxDegreeOfParallelism = recommendedThreads, }; - var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; + var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); - Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => + var conditions = new List(); + conditions.Add(new QueryCondition() { - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - - _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + Field = "PendingCopyReadTime", + Operator = "=", + IsNumber = true, + Value = pendingCopyReadTime }); + _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + { + TableNameOrTreePath = DevicePathBuilder.GetTableName(), + PageIndex = 1, + PageSize = 3000, + Conditions = conditions, + }); } @@ -718,13 +715,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading SystemName = SystemType, ProjectId = $"{ammeterInfo.ProjectID}", DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{ammeterInfo.MemberId}", - Timestamps = timestamps.GetDateTimeOffset().ToUnixTimeMilliseconds(), + DeviceId = $"{ammeterInfo.FocusAddress}", + Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), DatabaseBusiID = ammeterInfo.DatabaseBusiID, PendingCopyReadTime = timestamps, CreationTime = currentTime, MeterAddress = ammeterInfo.AmmerterAddress, - FocusAddress = ammeterInfo.FocusAddress, AFN = (int)aFN, Fn = fn, //Seq = builderResponse.Seq, @@ -743,15 +739,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading } return taskList; - - //using (var score = _serviceProvider.CreateScope()) - //{ - // var _dbContext = score.ServiceProvider.GetRequiredService(); - // _dbContext.UseTableSessionPool = true; - // _dbProvider.BatchInsertAsync(taskList); - //} - - } #endregion @@ -1009,7 +996,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var page = await _redisDataCacheService.GetAllPagedData( redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1, + pageSize: 10, lastScore: cursor, lastMember: member); meterInfos.AddRange(page.Items); @@ -1031,57 +1018,90 @@ namespace JiShe.CollectBus.ScheduledMeterReading ); timer.Stop(); - _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息"); } /// /// 创建Kafka消息 /// - /// - /// /// - private async Task CreateMeterKafkaTaskMessage( - string redisCacheTelemetryPacketInfoHashKey, - string redisCacheTelemetryPacketInfoZSetScoresIndexKey) + private async Task CreateMeterKafkaTaskMessage(int timeDensity, IoTDBQueryOptions options) where T : IoTEntity, new() { - if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)) - { - throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101"); - } - - decimal? cursor = null; - string member = null; + int pageNumber = 0; bool hasNext; var stopwatch = Stopwatch.StartNew(); - //do - //{ - // var page = await _redisDataCacheService.GetAllPagedData( - // redisCacheTelemetryPacketInfoHashKey, - // redisCacheTelemetryPacketInfoZSetScoresIndexKey, - // pageSize: 1000, - // lastScore: cursor, - // lastMember: member); + do + { + options.PageIndex = pageNumber++; - // cursor = page.HasNext ? page.NextScore : null; - // member = page.HasNext ? page.NextMember : null; - // hasNext = page.HasNext; + var pageResult = await _dbProvider.QueryAsync(options); - // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - // items: page.Items, - // deviceIdSelector: data => data.FocusAddress, - // processor: (data, groupIndex) => - // { - // _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); - // } - // ); + hasNext = pageResult.HasNext; - //} while (hasNext); + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: pageResult.Items.ToList(), + deviceIdSelector: data => data.DeviceId, + processor: (data, groupIndex) => + { + _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + } + ); + + } while (hasNext); stopwatch.Stop(); - _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } + + ///// + ///// 创建Kafka消息 + ///// + ///// + ///// + ///// + //private async Task CreateMeterKafkaTaskMessage( + //string redisCacheTelemetryPacketInfoHashKey, + //string redisCacheTelemetryPacketInfoZSetScoresIndexKey) + //{ + // if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)) + // { + // throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101"); + // } + + // decimal? cursor = null; + // string member = null; + // bool hasNext; + // var stopwatch = Stopwatch.StartNew(); + // do + // { + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheTelemetryPacketInfoHashKey, + // redisCacheTelemetryPacketInfoZSetScoresIndexKey, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); + + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + + // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + // items: page.Items, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, groupIndex) => + // { + // _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + // } + // ); + + // } while (hasNext); + + // stopwatch.Stop(); + // _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + //} + /// /// Kafka 推送消息 /// @@ -1089,15 +1109,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 任务记录 /// 对应分区,也就是集中器号所在的分组序号 /// - private async Task KafkaProducerIssuedMessageAction(string topicName, - MeterReadingTelemetryPacketInfo taskRecord, int partition) + private async Task KafkaProducerIssuedMessageAction(string topicName, + T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } - await _producerService.ProduceAsync(topicName, partition, taskRecord); + await _producerService.ProduceAsync(topicName, taskRecord, partition); } #endregion diff --git a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index c07950f..add3131 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Ammeters /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 /// [Column(IsIgnore = true)] - public override string MemberId => $"{FocusId}:{MeterId}"; + public override string MemberId => $"{FocusAddress}:{MeteringCode}"; /// /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index 3bffbb8..3ac4202 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -28,7 +28,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords { get { - return $"{FocusAddress}.{TaskMark}".Md5Fun(); + return $"{DeviceId}.{TaskMark}".Md5Fun(); } } @@ -67,13 +67,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// [FIELDColumn] public int MeterId { get; set; } - - /// - /// 集中器地址 - /// - [FIELDColumn] - public string FocusAddress { get; set; } - + /// /// 表地址 /// @@ -172,7 +166,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// 是否已上报 /// [FIELDColumn] - public bool IsReceived { get; set; } + public bool IsReceived { get; set; } } } diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs index 5184459..6420a23 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs @@ -11,6 +11,11 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// public class TasksToBeIssueModel { + /// + /// 上次下发任务的时间 + /// + public DateTime? LastTaskTime { get; set; } + /// /// 下个任务时间 /// diff --git a/shared/JiShe.CollectBus.Common/Enums/TimestampUnit.cs b/shared/JiShe.CollectBus.Common/Enums/TimestampUnit.cs new file mode 100644 index 0000000..9bed83f --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Enums/TimestampUnit.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Enums +{ + public enum TimestampUnit + { + Seconds, // 秒级(Unix 时间戳) + Milliseconds, // 毫秒级(默认) + Microseconds, // 微秒级 + Nanoseconds // 纳秒级 + } +} diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index e72f4fd..6936ad3 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -215,6 +215,43 @@ namespace JiShe.CollectBus.Common.Extensions .AddHours(hours) .AddMinutes(minutes); } - + + + /// + /// 格式化为微秒(μs) + /// + /// + /// + public static string ToMicrosecondString(this DateTime dt) + { + long microseconds = (dt.Ticks % TimeSpan.TicksPerSecond) / 10; // 1 Tick = 100ns → 0.1μs + return $"{dt:yyyy-MM-dd HH:mm:ss.fffffff}".Remove(23) + $"{microseconds:D6}"; + } + + /// + /// 格式化为纳秒(ns) + /// + /// + /// + public static string ToNanosecondString(this DateTime dt) + { + long nanoseconds = (dt.Ticks % TimeSpan.TicksPerSecond) * 100; // 1 Tick = 100ns + return $"{dt:yyyy-MM-dd HH:mm:ss.fffffff}".Remove(23) + $"{nanoseconds:D9}"; + } + + /// + /// 毫米、微秒、纳秒时间戳转DateTime + /// + /// + /// + /// + public static DateTime ParseIntToDate(long dateLong) + { + if (dateLong < 10000101 || dateLong > 99991231) + { + throw new ArgumentException("Date must be between 10000101 and 99991231."); + } + return DateTime.TryParseExact(dateLong.ToString(), "yyyyMMdd HHmmssZZ", null, System.Globalization.DateTimeStyles.None, out DateTime date) ? date : throw new ArgumentException("Date must be between 10000101 and 99991231."); + } } } diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs index c73fbd0..0f07e9c 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs @@ -26,41 +26,7 @@ namespace JiShe.CollectBus.Common.Extensions { return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; } - - /// - /// 采集时间节点计算 - /// - /// 待采集时间 - /// - /// - public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval) - { - // 计算精确到分钟的基准时间 - var baseTime = new DateTime( - referenceTime.Year, - referenceTime.Month, - referenceTime.Day, - referenceTime.Hour, - referenceTime.Minute, - 0); - - // 计算总分钟数和下一个间隔点 - int totalMinutes = baseTime.Hour * 60 + baseTime.Minute; - int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval; - - // 处理跨天情况 - int daysToAdd = nextTotalMinutes / (24 * 60); - int remainingMinutes = nextTotalMinutes % (24 * 60); - int hours = remainingMinutes / 60; - int minutes = remainingMinutes % 60; - - return baseTime.Date - .AddDays(daysToAdd) - .AddHours(hours) - .AddMinutes(minutes); - } - - + /// /// 将 DateTime 时间转换为 DateTimeOffset 时间 /// @@ -75,27 +41,6 @@ namespace JiShe.CollectBus.Common.Extensions // 转换为 DateTimeOffset(自动应用本地时区偏移) return new DateTimeOffset(localDateTime); - } - - private static readonly long UnixEpochTicks = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero).Ticks; - - /// - /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的微秒数 - /// - public static long ToUnixTimeMicroseconds(this DateTimeOffset dateTimeOffset) - { - // Ticks 单位是 100 纳秒,转换为微秒需除以 10 - long elapsedTicks = dateTimeOffset.Ticks - UnixEpochTicks; - return elapsedTicks / 10; // 1 微秒 = 1000 纳秒 = 10 Ticks - } - - /// - /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的纳秒数 - /// - public static long ToUnixTimeNanoseconds(this DateTimeOffset dateTimeOffset) - { - long nanoseconds = (dateTimeOffset.Ticks - UnixEpochTicks) * 100; - return nanoseconds; - } + } } } diff --git a/shared/JiShe.CollectBus.Common/Helpers/TimestampHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/TimestampHelper.cs new file mode 100644 index 0000000..9edc3cd --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Helpers/TimestampHelper.cs @@ -0,0 +1,68 @@ +using JiShe.CollectBus.Common.Enums; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Helpers +{ + /// + /// 时间戳帮助类 + /// + public static class TimestampHelper + { + private static readonly long UnixEpochTicks = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero).Ticks; + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的微秒数 + /// + public static long ToUnixTimeMicroseconds(this DateTimeOffset dateTimeOffset) + { + // Ticks 单位是 100 纳秒,转换为微秒需除以 10 + long elapsedTicks = dateTimeOffset.Ticks - UnixEpochTicks; + return elapsedTicks / 10; // 1 微秒 = 1000 纳秒 = 10 Ticks + } + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的纳秒数 + /// + public static long ToUnixTimeNanoseconds(this DateTimeOffset dateTimeOffset) + { + long nanoseconds = (dateTimeOffset.Ticks - UnixEpochTicks) * 100; + return nanoseconds; + } + + /// + /// 将 long 类型时间戳转换为 DateTime(UTC) + /// + /// 时间戳数值 + /// 时间戳单位 + public static DateTime ConvertToDateTime(long timestamp, TimestampUnit unit = TimestampUnit.Milliseconds) + { + long ticks = unit switch + { + TimestampUnit.Seconds => checked(timestamp * TimeSpan.TicksPerSecond), + TimestampUnit.Milliseconds => checked(timestamp * TimeSpan.TicksPerMillisecond), + TimestampUnit.Microseconds => checked(timestamp * 10), // 1微秒 = 10 Ticks(100纳秒) + TimestampUnit.Nanoseconds => checked(timestamp / 100),// 1 Tick = 100纳秒 + _ => throw new ArgumentException("无效的时间单位", nameof(unit)) + }; + + try + { + DateTime result = new DateTime(UnixEpochTicks + ticks, DateTimeKind.Utc); + // 校验结果是否在 DateTime 合法范围内(0001-01-01 至 9999-12-31) + if (result < DateTime.MinValue || result > DateTime.MaxValue) + { + throw new ArgumentOutOfRangeException(nameof(timestamp), "时间戳超出 DateTime 范围"); + } + return result; + } + catch (ArgumentOutOfRangeException ex) + { + throw new ArgumentOutOfRangeException("时间戳无效", ex); + } + } + } +} diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index ed0e567..fca91bb 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -84,8 +84,8 @@ "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus3", - "FirstCollectionTime": "2025-04-21 16:11:00" + "ServerTagName": "JiSheCollectBus4", + "FirstCollectionTime": "2025-04-22 16:07:00" }, "IoTDBOptions": { "UserName": "root", @@ -93,7 +93,7 @@ "ClusterList": [ "192.168.1.9:6667" ], "PoolSize": 2, "DataBaseName": "energy", - "OpenDebugMode": true, + "OpenDebugMode": false, "UseTableSessionPoolByDefault": false }, "Cassandra": {