diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 43bf8a4..28b6ab0 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -14,6 +14,7 @@ using Microsoft.Extensions.Options; using JiShe.CollectBus.IoTDBProvider.Context; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.IotSystems.AFNEntity; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; @@ -69,8 +70,21 @@ public class SampleAppService : CollectBusAppService, ISampleAppService [HttpGet] public async Task UseTableSessionPool() { - //_dbContext.UseTableSessionPool = true; - _iotDBProvider.SwitchSessionPool(true); + ElectricityMeter meter2 = new ElectricityMeter() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + Current = 10, + MeterModel = "DDZY-1980", + ProjectCode = "10059", + Voltage = 10, + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + + await _iotDBProvider.InsertAsync(meter2); + + _dbContext.UseTableSessionPool = true; ElectricityMeter meter = new ElectricityMeter() { @@ -110,6 +124,27 @@ public class SampleAppService : CollectBusAppService, ISampleAppService } + /// + /// 测试单个测点数据项 + /// + /// + /// + [HttpGet] + public async Task TestSingleMeasuringAFNData(string measuring, string value) + { + var meter = new SingleMeasuringAFNDataEntity() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + ProjectCode = "10059", + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + SingleMeasuring = new Tuple(measuring, value) + }; + await _iotDBProvider.InsertAsync(meter); + } + + public Task GetAsync() { return Task.FromResult( diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index a823798..7311c42 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -764,7 +764,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; Dictionary keyValuePairs = new Dictionary(); foreach (var subItem in item) { @@ -781,7 +781,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading NextTask = DateTime.Now.AddMinutes(1) }; - var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); + var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成"); diff --git a/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs b/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs similarity index 53% rename from src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs rename to src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs index 5f0e44c..751e9e3 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/AFNDataEntity.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs @@ -10,8 +10,12 @@ namespace JiShe.CollectBus.IotSystems.AFNEntity /// /// AFN单项数据实体 /// - public class AFNDataEntity:IoTEntity + public class SingleMeasuringAFNDataEntity : IoTEntity { - public string ItemCode { get; set; } + /// + /// 单项数据对象 + /// + [SingleMeasuring(nameof(SingleMeasuring))] + public Tuple SingleMeasuring { get; set; } } } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 4416fd2..2c72f7d 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -35,12 +35,12 @@ }, "ConnectionStrings": { "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", - "Kafka": "192.168.0.151:29092,192.168.0.151:39092,192.168.0.151:49092", + "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" }, "Redis": { - "Configuration": "192.168.0.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "DefaultDB": "14", "HangfireDB": "15" }, @@ -95,7 +95,7 @@ "Kafka": { "Connections": { "Default": { - "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092" + "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092" // "SecurityProtocol": "SASL_PLAINTEXT", // "SaslMechanism": "PLAIN", // "SaslUserName": "lixiao", @@ -121,7 +121,7 @@ "IoTDBOptions": { "UserName": "root", "Password": "root", - "ClusterList": [ "192.168.0.151:6667" ], + "ClusterList": [ "192.168.1.9:6667" ], "PoolSize": 2, "DataBaseName": "energy", "OpenDebugMode": true, diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs index f69fd99..ba5af69 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/SingleMeasuringAttribute.cs @@ -7,10 +7,16 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// - /// 用于标识当前实体为单个测点,单侧点标识字段类型是Dictionary + /// 用于标识当前实体为单侧点模式,单侧点模式只有一个Filed标识字段,类型是Tuple,Item1=>测点名称,Item2=>测点值,泛型 /// [AttributeUsage(AttributeTargets.Property)] public class SingleMeasuringAttribute : Attribute { + public string FieldName { get; set;} + + public SingleMeasuringAttribute(string fieldName) + { + FieldName = fieldName; + } } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs index b67f17d..0aad03e 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs @@ -11,11 +11,11 @@ namespace JiShe.CollectBus.IoTDBProvider /// public interface IIoTDBProvider { - /// - /// 切换 SessionPool - /// - /// 是否使用表模型 - void SwitchSessionPool(bool useTableSession); + ///// + ///// 切换 SessionPool + ///// + ///// 是否使用表模型 + //void SwitchSessionPool(bool useTableSession); /// /// 插入数据 diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs index b904ba0..c7d9b10 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs @@ -9,7 +9,7 @@ namespace JiShe.CollectBus.IoTDBProvider.Interface /// /// Session 工厂接口 /// - public interface IIoTDBSessionFactory + public interface IIoTDBSessionFactory:IDisposable { IIoTDBSessionPool GetSessionPool(bool useTableSession); } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs index 0d5b408..6bdbd56 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs @@ -12,10 +12,15 @@ namespace JiShe.CollectBus.IoTDBProvider /// public class DeviceMetadata { + /// + /// 是否有单测量值 + /// + public bool IsSingleMeasuring { get; set; } + /// /// 测量值集合,用于构建Table的测量值,也就是columnNames参数 /// - public List ColumnNames { get; } = new(); + public List ColumnNames { get; set; } = new(); /// /// 列类型集合,用于构建Table的列类型,也就是columnCategories参数 @@ -25,6 +30,6 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// 值类型集合,用于构建Table的值类型,也就是dataTypes参数 /// - public ListDataTypes { get; } = new(); + public List DataTypes { get; } = new(); } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs index 504709e..705b2c1 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; +using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -24,12 +25,14 @@ namespace JiShe.CollectBus.IoTDBProvider /// public class IoTDBProvider : IIoTDBProvider { - private IIoTDBSessionPool _currentSession; private static readonly ConcurrentDictionary _metadataCache = new(); private readonly ILogger _logger; private readonly IIoTDBSessionFactory _sessionFactory; private readonly IoTDBRuntimeContext _runtimeContext; + private IIoTDBSessionPool CurrentSession => + _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); + public IoTDBProvider( ILogger logger, IIoTDBSessionFactory sessionFactory, @@ -39,37 +42,6 @@ namespace JiShe.CollectBus.IoTDBProvider _sessionFactory = sessionFactory; _runtimeContext = runtimeContext; - RefreshSessionPool(); - } - - private void RefreshSessionPool() - { - try - { - _currentSession?.Dispose(); - _currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); - _currentSession.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); - } - catch (Exception ex) - { - //自动回退到备用SessionPool - _logger.LogError($"{nameof(RefreshSessionPool)} 切换SessionPool失败,回退到默认配置:{ex.Serialize()}"); - _runtimeContext.UseTableSessionPool = !_runtimeContext.UseTableSessionPool; - _currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); - } - } - - /// - /// 切换 SessionPool - /// - /// - public void SwitchSessionPool(bool useTableSession) - { - if (_runtimeContext.UseTableSessionPool != useTableSession) - { - _runtimeContext.UseTableSessionPool = useTableSession; - RefreshSessionPool(); - } } /// @@ -83,8 +55,8 @@ namespace JiShe.CollectBus.IoTDBProvider var metadata = GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); - - await _currentSession.InsertAsync(tablet); + + await CurrentSession.InsertAsync(tablet); //int result = await _currentSession.InsertAsync(tablet); //if (result <= 0) @@ -108,7 +80,7 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); - await _currentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet); //var result = await _currentSession.InsertAsync(tablet); //if (result <= 0) //{ @@ -127,7 +99,7 @@ namespace JiShe.CollectBus.IoTDBProvider public async Task DeleteAsync(QueryOptions options) where T : IoTEntity { var query = BuildDeleteSQL(options); - var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); if (!sessionDataSet.HasNext()) { @@ -149,7 +121,7 @@ namespace JiShe.CollectBus.IoTDBProvider public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuerySQL(options); - var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); var result = new PagedResult { @@ -172,30 +144,60 @@ namespace JiShe.CollectBus.IoTDBProvider var timestamps = new List(); var values = new List>(); var devicePaths = new HashSet(); + List tempColumnNames = new List(); + tempColumnNames.AddRange(metadata.ColumnNames); foreach (var entity in entities) { timestamps.Add(entity.Timestamps); var rowValues = new List(); - foreach (var measurement in metadata.ColumnNames) + foreach (var measurement in tempColumnNames) { + PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); - if (propertyInfo==null) + if (propertyInfo == null) { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况。"); + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); } + var value = propertyInfo.GetValue(entity); - if (value != null) + if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && value != null)//表示当前对象是单测点模式 { - rowValues.Add(value); + Type tupleType = value.GetType(); + Type[] tupleArgs = tupleType.GetGenericArguments(); + Type item2Type = tupleArgs[1]; // T 的实际类型 + var item1 = tupleType.GetProperty("Item1")!.GetValue(value); + var item2 = tupleType.GetProperty("Item2")!.GetValue(value); + if (item1 == null || item2 == null) + { + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); + } + + var indexOf = metadata.ColumnNames.IndexOf(measurement); + metadata.ColumnNames[indexOf] = (string)item1!; + + rowValues.Add(item2); + } else { - DataTypeValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); - rowValues.Add(defaultValue); + if (value != null) + { + rowValues.Add(value); + } + else + { + //填充默认数据值 + DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); + + rowValues.Add(defaultValue); + } } + } + values.Add(rowValues); + if (!_runtimeContext.UseTableSessionPool)//树模型 { devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); @@ -346,7 +348,7 @@ namespace JiShe.CollectBus.IoTDBProvider countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); } - var result = await _currentSession.ExecuteQueryStatementAsync(countQuery); + var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; } @@ -398,12 +400,34 @@ namespace JiShe.CollectBus.IoTDBProvider /// private DeviceMetadata GetMetadata() where T : IoTEntity { - return _metadataCache.GetOrAdd(typeof(T), type => - { - var columns = CollectColumnMetadata(type); - var metadata = BuildDeviceMetadata(columns); - return metadata; - }); + + var columns = CollectColumnMetadata(typeof(T)); + var metadata = BuildDeviceMetadata(columns); + + return _metadataCache.AddOrUpdate( + typeof(T), + addValueFactory: t => metadata, // 如果键不存在,用此值添加 + updateValueFactory: (t, existingValue) => + { + var columns = CollectColumnMetadata(t); + var metadata = BuildDeviceMetadata(columns); + + //对现有值 existingValue 进行修改,返回新值 + existingValue.ColumnNames = metadata.ColumnNames; + return existingValue; + } + ); + + //return _metadataCache.GetOrAdd(typeof(T), type => + //{ + // var columns = CollectColumnMetadata(type); + // var metadata = BuildDeviceMetadata(columns); + // //if (metadata.IsSingleMeasuring) + // //{ + // // _metadataCache.Remove(typeof(T)); + // //} + // return metadata; + //}); } /// @@ -418,51 +442,43 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var prop in type.GetProperties()) { //先获取Tag标签和属性标签 - ColumnInfo? column = prop.GetCustomAttribute() is not null ? new ColumnInfo( - name: prop.Name, + ColumnInfo? column = prop.GetCustomAttribute() is not null ? new ColumnInfo( + name: prop.Name, category: ColumnCategory.TAG, - dataType: GetDataTypeFromTypeName(prop.PropertyType.Name), false + dataType: GetDataTypeFromTypeName(prop.PropertyType.Name), + false ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.ATTRIBUTE, - GetDataTypeFromTypeName(prop.PropertyType.Name),false - ) : null; - - //最先检查是不是单测点 - if (prop.GetCustomAttribute() is not null) - { - //单测点的情况下,字段类型是Dictionary - Dictionary keyValuePairs = prop.GetValue(null) as Dictionary; - column = new ColumnInfo( - keyValuePairs.Keys.First(), - ColumnCategory.FIELD, - GetDataTypeFromTypeName(prop.PropertyType.Name), false - ); - } - else - { - //不是单测点的情况下,直接获取字段名称作为测点名称 - column = prop.GetCustomAttribute() is not null ? new ColumnInfo( + GetDataTypeFromTypeName(prop.PropertyType.Name), + false + ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.FIELD, - GetDataTypeFromTypeName(prop.PropertyType.Name), false - ) : null; - } + GetDataTypeFromTypeName(prop.PropertyType.Name), + false) + : null; - ////按优先级顺序检查属性,避免重复反射 - //column = prop.GetCustomAttribute() is not null ? new ColumnInfo( - // name: prop.Name, //使用属性名 - // category: ColumnCategory.TAG, - // dataType: GetDataTypeFromTypeName(prop.PropertyType.Name) - //) : prop.GetCustomAttribute() is not null ? new ColumnInfo( - // prop.Name, - // ColumnCategory.ATTRIBUTE, - // GetDataTypeFromTypeName(prop.PropertyType.Name) - //) : prop.GetCustomAttribute() is not null ? new ColumnInfo( - // prop.Name, - // ColumnCategory.FIELD, - // GetDataTypeFromTypeName(prop.PropertyType.Name) - //) : null; + //最先检查是不是单侧点模式 + SingleMeasuringAttribute singleMeasuringAttribute = prop.GetCustomAttribute(); + + if (singleMeasuringAttribute != null && column == null) + { + //warning: 单侧点模式注意事项 + //Entity实体 字段类型是 Tuple,Item1=>测点名称,Item2=>测点值,泛型 + //只有一个Filed字段。 + //MeasuringName 默认为 SingleMeasuringAttribute.FieldName,以便于在获取对应的Value的时候重置为 Item1 的值。 + + Type tupleType = prop.PropertyType; + Type[] tupleArgs = tupleType.GetGenericArguments(); + + column = new ColumnInfo( + singleMeasuringAttribute.FieldName, + ColumnCategory.FIELD, + GetDataTypeFromTypeName(tupleArgs[1].Name), + true + ); + } if (column.HasValue) { @@ -481,7 +497,13 @@ namespace JiShe.CollectBus.IoTDBProvider { var metadata = new DeviceMetadata(); - //按业务逻辑顺序处理(TAG -> FIELD -> ATTRIBUTE) + //先检查是不是单侧点模型 + if (columns.Any(c => c.IsSingleMeasuring)) + { + metadata.IsSingleMeasuring = true; + } + + //按业务逻辑顺序处理(TAG -> ATTRIBUTE -> FIELD) var groupedColumns = columns .GroupBy(c => c.Category) .ToDictionary(g => g.Key, g => g.ToList()); @@ -522,7 +544,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// 是否是单测点 /// - public bool IsSingleMeasuring { get;} + public bool IsSingleMeasuring { get; } /// /// 列类型 @@ -534,7 +556,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// public TSDataType DataType { get; } - public ColumnInfo(string name, ColumnCategory category, TSDataType dataType,bool isSingleMeasuring) + public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring) { Name = name; Category = category; @@ -581,7 +603,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// 根据类型名称获取 IoTDB 数据默认值 /// - private readonly IReadOnlyDictionary DataTypeValueMap = + private readonly IReadOnlyDictionary DataTypeDefaultValueMap = new Dictionary(StringComparer.OrdinalIgnoreCase) { ["BOOLEAN"] = false, diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs index a9b462a..cbc3869 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs @@ -17,6 +17,7 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider { private readonly IoTDBOptions _options; private readonly ConcurrentDictionary _pools = new(); + private bool _disposed; public IoTDBSessionFactory(IOptions options) { @@ -25,12 +26,27 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider public IIoTDBSessionPool GetSessionPool(bool useTableSession) { + if (_disposed) throw new ObjectDisposedException(nameof(IoTDBSessionFactory)); + return _pools.GetOrAdd(useTableSession, key => { - return key - ? new TableSessionPoolAdapter(_options) + var pool = key + ? (IIoTDBSessionPool)new TableSessionPoolAdapter(_options) : new SessionPoolAdapter(_options); + + pool.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); ; + return pool; }); } + + public void Dispose() + { + foreach (var pool in _pools.Values) + { + pool.Dispose(); + } + _pools.Clear(); + _disposed = true; + } } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs index 81a69a9..e836672 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs @@ -53,7 +53,13 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider /// public async Task InsertAsync(Tablet tablet) { - return await _sessionPool.InsertAlignedTabletAsync(tablet); + var result = await _sessionPool.InsertAlignedTabletAsync(tablet); + if (result != 0) + { + throw new Exception($"{nameof(TableSessionPoolAdapter)} "); + } + + return result; } /// diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs index 3676199..22de1b0 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs @@ -51,7 +51,13 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider /// public async Task InsertAsync(Tablet tablet) { - return await _sessionPool.InsertAsync(tablet); + var result = await _sessionPool.InsertAsync(tablet); + if (result != 0) + { + throw new Exception($"{nameof(TableSessionPoolAdapter)} "); + } + + return result; } ///