diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 2b3962d..551dc60 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -14,6 +14,7 @@ using Microsoft.Extensions.Options; using JiShe.CollectBus.IoTDBProvider.Context; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.IotSystems.AFNEntity; namespace JiShe.CollectBus.Samples; @@ -67,8 +68,21 @@ public class SampleAppService : CollectBusAppService, ISampleAppService [HttpGet] public async Task UseTableSessionPool() { - //_dbContext.UseTableSessionPool = true; - _iotDBProvider.SwitchSessionPool(true); + ElectricityMeter meter2 = new ElectricityMeter() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + Current = 10, + MeterModel = "DDZY-1980", + ProjectCode = "10059", + Voltage = 10, + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + + await _iotDBProvider.InsertAsync(meter2); + + _dbContext.UseTableSessionPool = true; ElectricityMeter meter = new ElectricityMeter() { @@ -108,6 +122,27 @@ public class SampleAppService : CollectBusAppService, ISampleAppService } + /// + /// 测试单个测点数据项 + /// + /// + /// + [HttpGet] + public async Task TestSingleMeasuringAFNData(string measuring, object value) + { + SingleMeasuringAFNDataEntity meter = new SingleMeasuringAFNDataEntity() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + ProjectCode = "10059", + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + SingleMeasuring = new Tuple(measuring, value) + }; + await _iotDBProvider.InsertAsync(meter); + } + + public Task GetAsync() { return Task.FromResult( diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index a823798..7311c42 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -764,7 +764,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; Dictionary keyValuePairs = new Dictionary(); foreach (var subItem in item) { @@ -781,7 +781,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading NextTask = DateTime.Now.AddMinutes(1) }; - var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); + var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成"); diff --git a/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs b/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs similarity index 53% rename from src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs rename to src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs index 5f0e44c..f53ca21 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs @@ -10,8 +10,12 @@ namespace JiShe.CollectBus.IotSystems.AFNEntity /// /// AFN单项数据实体 /// - public class AFNDataEntity:IoTEntity + public class SingleMeasuringAFNDataEntity:IoTEntity { - public string ItemCode { get; set; } + /// + /// 单项数据对象 + /// + [SingleMeasuring(nameof(SingleMeasuring))] + public Tuple SingleMeasuring { get; set; } } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs index f69fd99..9a85a85 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs @@ -7,10 +7,16 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// - /// 用于标识当前实体为单个测点,单侧点标识字段类型是Dictionary + /// 用于标识当前实体为单侧点模式,单侧点模式只有一个Filed标识字段,类型是Tuple,Item1=>测点名称,Item2=>测点值 /// [AttributeUsage(AttributeTargets.Property)] public class SingleMeasuringAttribute : Attribute { + public string FieldName { get; set;} + + public SingleMeasuringAttribute(string fieldName) + { + FieldName = fieldName; + } } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs index b67f17d..0aad03e 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs @@ -11,11 +11,11 @@ namespace JiShe.CollectBus.IoTDBProvider /// public interface IIoTDBProvider { - /// - /// 切换 SessionPool - /// - /// 是否使用表模型 - void SwitchSessionPool(bool useTableSession); + ///// + ///// 切换 SessionPool + ///// + ///// 是否使用表模型 + //void SwitchSessionPool(bool useTableSession); /// /// 插入数据 diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs index b904ba0..c7d9b10 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs @@ -9,7 +9,7 @@ namespace JiShe.CollectBus.IoTDBProvider.Interface /// /// Session 工厂接口 /// - public interface IIoTDBSessionFactory + public interface IIoTDBSessionFactory:IDisposable { IIoTDBSessionPool GetSessionPool(bool useTableSession); } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs index 0d5b408..50aa8f7 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs @@ -12,6 +12,11 @@ namespace JiShe.CollectBus.IoTDBProvider /// public class DeviceMetadata { + /// + /// 是否有单测量值 + /// + public bool IsSingleMeasuring { get; set; } + /// /// 测量值集合,用于构建Table的测量值,也就是columnNames参数 /// diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs index 504709e..297d250 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs @@ -24,12 +24,14 @@ namespace JiShe.CollectBus.IoTDBProvider /// public class IoTDBProvider : IIoTDBProvider { - private IIoTDBSessionPool _currentSession; private static readonly ConcurrentDictionary _metadataCache = new(); private readonly ILogger _logger; private readonly IIoTDBSessionFactory _sessionFactory; private readonly IoTDBRuntimeContext _runtimeContext; + private IIoTDBSessionPool CurrentSession => + _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); + public IoTDBProvider( ILogger logger, IIoTDBSessionFactory sessionFactory, @@ -39,39 +41,8 @@ namespace JiShe.CollectBus.IoTDBProvider _sessionFactory = sessionFactory; _runtimeContext = runtimeContext; - RefreshSessionPool(); } - - private void RefreshSessionPool() - { - try - { - _currentSession?.Dispose(); - _currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); - _currentSession.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); - } - catch (Exception ex) - { - //自动回退到备用SessionPool - _logger.LogError($"{nameof(RefreshSessionPool)} 切换SessionPool失败,回退到默认配置:{ex.Serialize()}"); - _runtimeContext.UseTableSessionPool = !_runtimeContext.UseTableSessionPool; - _currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); - } - } - - /// - /// 切换 SessionPool - /// - /// - public void SwitchSessionPool(bool useTableSession) - { - if (_runtimeContext.UseTableSessionPool != useTableSession) - { - _runtimeContext.UseTableSessionPool = useTableSession; - RefreshSessionPool(); - } - } - + /// /// 插入数据 /// @@ -83,8 +54,8 @@ namespace JiShe.CollectBus.IoTDBProvider var metadata = GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); - - await _currentSession.InsertAsync(tablet); + + await CurrentSession.InsertAsync(tablet); //int result = await _currentSession.InsertAsync(tablet); //if (result <= 0) @@ -108,7 +79,7 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); - await _currentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet); //var result = await _currentSession.InsertAsync(tablet); //if (result <= 0) //{ @@ -127,7 +98,7 @@ namespace JiShe.CollectBus.IoTDBProvider public async Task DeleteAsync(QueryOptions options) where T : IoTEntity { var query = BuildDeleteSQL(options); - var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); if (!sessionDataSet.HasNext()) { @@ -149,7 +120,7 @@ namespace JiShe.CollectBus.IoTDBProvider public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuerySQL(options); - var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); var result = new PagedResult { @@ -180,22 +151,45 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var measurement in metadata.ColumnNames) { PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); - if (propertyInfo==null) + if (propertyInfo == null) { throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况。"); } + var value = propertyInfo.GetValue(entity); - if (value != null) + if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false))//表示当前对象是单测点模式, { - rowValues.Add(value); + Tuple valueTuple = (Tuple)value!; + //获得当前Field对应的是的在measurement column中的序号,然后直接更新 metadata 中datetype对应序号的数据类型 + var indexOf = metadata.ColumnNames.IndexOf(measurement); + metadata.ColumnNames[indexOf] = valueTuple.Item1; + Type item2Type = valueTuple.Item2.GetType(); + metadata.DataTypes[indexOf] = GetDataTypeFromTypeName(valueTuple.Item2.GetType().Name); + + rowValues.Add(valueTuple.Item2); + + //TSDataType singleMeasuringTSDataType = + } else { - DataTypeValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); - rowValues.Add(defaultValue); + if (value != null) + { + rowValues.Add(value); + } + else + { + //填充默认数据值 + DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); + + rowValues.Add(defaultValue); + } } + } + values.Add(rowValues); + if (!_runtimeContext.UseTableSessionPool)//树模型 { devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); @@ -346,7 +340,7 @@ namespace JiShe.CollectBus.IoTDBProvider countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); } - var result = await _currentSession.ExecuteQueryStatementAsync(countQuery); + var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; } @@ -418,35 +412,40 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var prop in type.GetProperties()) { //先获取Tag标签和属性标签 - ColumnInfo? column = prop.GetCustomAttribute() is not null ? new ColumnInfo( - name: prop.Name, + ColumnInfo? column = prop.GetCustomAttribute() is not null ? new ColumnInfo( + name: prop.Name, category: ColumnCategory.TAG, - dataType: GetDataTypeFromTypeName(prop.PropertyType.Name), false + dataType: GetDataTypeFromTypeName(prop.PropertyType.Name), + false ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.ATTRIBUTE, - GetDataTypeFromTypeName(prop.PropertyType.Name),false - ) : null; - - //最先检查是不是单测点 - if (prop.GetCustomAttribute() is not null) - { - //单测点的情况下,字段类型是Dictionary - Dictionary keyValuePairs = prop.GetValue(null) as Dictionary; - column = new ColumnInfo( - keyValuePairs.Keys.First(), - ColumnCategory.FIELD, - GetDataTypeFromTypeName(prop.PropertyType.Name), false - ); - } - else - { - //不是单测点的情况下,直接获取字段名称作为测点名称 - column = prop.GetCustomAttribute() is not null ? new ColumnInfo( + GetDataTypeFromTypeName(prop.PropertyType.Name), + false + ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.FIELD, - GetDataTypeFromTypeName(prop.PropertyType.Name), false - ) : null; + GetDataTypeFromTypeName(prop.PropertyType.Name), + false) + : null; + + //最先检查是不是单侧点模式 + SingleMeasuringAttribute singleMeasuringAttribute = prop.GetCustomAttribute(); + + if (singleMeasuringAttribute != null && column == null) + { + //warning: 单侧点模式注意事项 + //Entity实体 字段类型是 Tuple,Item1=>测点名称,Item2=>测点值 + //只有一个Filed字段。 + //MeasuringName 默认为 SingleMeasuringAttribute.FieldName。 + //DateTye 默认为 string,在获取对用的Value值的时候在进行重置。 + + column = new ColumnInfo( + singleMeasuringAttribute.FieldName, + ColumnCategory.FIELD, + GetDataTypeFromTypeName(singleMeasuringAttribute.FieldName), + true + ); } ////按优先级顺序检查属性,避免重复反射 @@ -481,7 +480,13 @@ namespace JiShe.CollectBus.IoTDBProvider { var metadata = new DeviceMetadata(); - //按业务逻辑顺序处理(TAG -> FIELD -> ATTRIBUTE) + //先检查是不是单侧点模型 + if (columns.Any(c => c.IsSingleMeasuring)) + { + metadata.IsSingleMeasuring = true; + } + + //按业务逻辑顺序处理(TAG -> ATTRIBUTE -> FIELD) var groupedColumns = columns .GroupBy(c => c.Category) .ToDictionary(g => g.Key, g => g.ToList()); @@ -522,7 +527,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// 是否是单测点 /// - public bool IsSingleMeasuring { get;} + public bool IsSingleMeasuring { get; } /// /// 列类型 @@ -534,7 +539,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// public TSDataType DataType { get; } - public ColumnInfo(string name, ColumnCategory category, TSDataType dataType,bool isSingleMeasuring) + public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring) { Name = name; Category = category; @@ -581,7 +586,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// 根据类型名称获取 IoTDB 数据默认值 /// - private readonly IReadOnlyDictionary DataTypeValueMap = + private readonly IReadOnlyDictionary DataTypeDefaultValueMap = new Dictionary(StringComparer.OrdinalIgnoreCase) { ["BOOLEAN"] = false, diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs index a9b462a..cbc3869 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs @@ -17,6 +17,7 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider { private readonly IoTDBOptions _options; private readonly ConcurrentDictionary _pools = new(); + private bool _disposed; public IoTDBSessionFactory(IOptions options) { @@ -25,12 +26,27 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider public IIoTDBSessionPool GetSessionPool(bool useTableSession) { + if (_disposed) throw new ObjectDisposedException(nameof(IoTDBSessionFactory)); + return _pools.GetOrAdd(useTableSession, key => { - return key - ? new TableSessionPoolAdapter(_options) + var pool = key + ? (IIoTDBSessionPool)new TableSessionPoolAdapter(_options) : new SessionPoolAdapter(_options); + + pool.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); ; + return pool; }); } + + public void Dispose() + { + foreach (var pool in _pools.Values) + { + pool.Dispose(); + } + _pools.Clear(); + _disposed = true; + } } }