优化IoTDB数据类型映射,实现微秒和纳秒的扩展封装
This commit is contained in:
parent
309f5c37d2
commit
a6d970af19
@ -16,7 +16,7 @@ public class CollectBusIoTDbModule : AbpModule
|
|||||||
var configuration = context.Services.GetConfiguration();
|
var configuration = context.Services.GetConfiguration();
|
||||||
Configure<IoTDbOptions>(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); });
|
Configure<IoTDbOptions>(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); });
|
||||||
|
|
||||||
// 注册上下文为Scoped
|
//// 注册上下文为Scoped
|
||||||
context.Services.AddScoped<IoTDbRuntimeContext>();
|
//context.Services.AddScoped<IoTDBRuntimeContext>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1,16 +1,17 @@
|
|||||||
using JiShe.CollectBus.IoTDB.Options;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
using Volo.Abp.DependencyInjection;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IoTDB.Context
|
namespace JiShe.CollectBus.IoTDB.Context
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// IoTDB SessionPool 运行时上下文
|
/// IoTDB SessionPool 运行时上下文
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class IoTDbRuntimeContext
|
public class IoTDBRuntimeContext: IScopedDependency
|
||||||
{
|
{
|
||||||
private readonly bool _defaultValue;
|
private readonly bool _defaultValue;
|
||||||
|
|
||||||
public IoTDbRuntimeContext(IOptions<IoTDbOptions> options)
|
public IoTDBRuntimeContext(IOptions<IoTDbOptions> options)
|
||||||
{
|
{
|
||||||
_defaultValue = options.Value.UseTableSessionPoolByDefault;
|
_defaultValue = options.Value.UseTableSessionPoolByDefault;
|
||||||
UseTableSessionPool = _defaultValue;
|
UseTableSessionPool = _defaultValue;
|
||||||
|
|||||||
@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDB.Model
|
|||||||
/// 项目编码
|
/// 项目编码
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[TAGColumn]
|
[TAGColumn]
|
||||||
public required string ProjectCode { get; set; }
|
public required string ProjectId { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备类型集中器、电表、水表、流量计、传感器等
|
/// 设备类型集中器、电表、水表、流量计、传感器等
|
||||||
@ -32,8 +32,13 @@ namespace JiShe.CollectBus.IoTDB.Model
|
|||||||
public required string DeviceId { get; set; }
|
public required string DeviceId { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 当前时间戳,单位毫秒,必须通过DateTimeOffset获取
|
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 数据创建时间戳,单位毫秒,必须通过DateTimeOffset获取
|
||||||
|
/// </summary>
|
||||||
|
public virtual long CreationTime { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static string GetDevicePath<T>(T entity) where T : IoTEntity
|
public static string GetDevicePath<T>(T entity) where T : IoTEntity
|
||||||
{
|
{
|
||||||
return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
|
return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static string GetDeviceTableName<T>(T entity) where T : IoTEntity
|
public static string GetDeviceTableName<T>(T entity) where T : IoTEntity
|
||||||
{
|
{
|
||||||
return $"{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
|
return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -24,7 +24,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
|
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
|
||||||
private readonly ILogger<IoTDbProvider> _logger;
|
private readonly ILogger<IoTDbProvider> _logger;
|
||||||
private readonly IIoTDbSessionFactory _sessionFactory;
|
private readonly IIoTDbSessionFactory _sessionFactory;
|
||||||
private readonly IoTDbRuntimeContext _runtimeContext;
|
private readonly IoTDBRuntimeContext _runtimeContext;
|
||||||
|
|
||||||
private IIoTDbSessionPool CurrentSession =>
|
private IIoTDbSessionPool CurrentSession =>
|
||||||
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
|
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
|
||||||
@ -38,7 +38,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
public IoTDbProvider(
|
public IoTDbProvider(
|
||||||
ILogger<IoTDbProvider> logger,
|
ILogger<IoTDbProvider> logger,
|
||||||
IIoTDbSessionFactory sessionFactory,
|
IIoTDbSessionFactory sessionFactory,
|
||||||
IoTDbRuntimeContext runtimeContext)
|
IoTDBRuntimeContext runtimeContext)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_sessionFactory = sessionFactory;
|
_sessionFactory = sessionFactory;
|
||||||
@ -54,11 +54,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task InsertAsync<T>(T entity) where T : IoTEntity
|
public async Task InsertAsync<T>(T entity) where T : IoTEntity
|
||||||
{
|
{
|
||||||
var metadata = GetMetadata<T>();
|
try
|
||||||
|
{
|
||||||
|
var metadata = GetMetadata<T>();
|
||||||
|
|
||||||
var tablet = BuildTablet(new[] { entity }, metadata);
|
var tablet = BuildTablet(new[] { entity }, metadata);
|
||||||
|
|
||||||
await CurrentSession.InsertAsync(tablet);
|
await CurrentSession.InsertAsync(tablet);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -68,15 +76,23 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
|
public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
|
||||||
{
|
{
|
||||||
var metadata = GetMetadata<T>();
|
try
|
||||||
|
|
||||||
var batchSize = 1000;
|
|
||||||
var batches = entities.Chunk(batchSize);
|
|
||||||
|
|
||||||
foreach (var batch in batches)
|
|
||||||
{
|
{
|
||||||
var tablet = BuildTablet(batch, metadata);
|
var metadata = GetMetadata<T>();
|
||||||
await CurrentSession.InsertAsync(tablet);
|
|
||||||
|
var batchSize = 1000;
|
||||||
|
var batches = entities.Chunk(batchSize);
|
||||||
|
|
||||||
|
foreach (var batch in batches)
|
||||||
|
{
|
||||||
|
var tablet = BuildTablet(batch, metadata);
|
||||||
|
await CurrentSession.InsertAsync(tablet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常");
|
||||||
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -89,18 +105,26 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity
|
public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity
|
||||||
{
|
{
|
||||||
var query = BuildDeleteSQL<T>(options);
|
try
|
||||||
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
|
|
||||||
|
|
||||||
if (!sessionDataSet.HasNext())
|
|
||||||
{
|
{
|
||||||
_logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。");
|
var query = BuildDeleteSQL<T>(options);
|
||||||
return 0;
|
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
|
||||||
}
|
|
||||||
|
|
||||||
//获取唯一结果行
|
if (!sessionDataSet.HasNext())
|
||||||
var row = sessionDataSet.Next();
|
{
|
||||||
return row.Values[0];
|
_logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
//获取唯一结果行
|
||||||
|
var row = sessionDataSet.Next();
|
||||||
|
return row.Values[0];
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -111,16 +135,24 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
|
public async Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
|
||||||
{
|
{
|
||||||
var query = BuildQuerySQL<T>(options);
|
try
|
||||||
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
|
|
||||||
|
|
||||||
var result = new BusPagedResult<T>
|
|
||||||
{
|
{
|
||||||
TotalCount = await GetTotalCount<T>(options),
|
var query = BuildQuerySQL<T>(options);
|
||||||
Items = ParseResults<T>(sessionDataSet, options.PageSize)
|
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
|
||||||
};
|
|
||||||
|
|
||||||
return result;
|
var result = new BusPagedResult<T>
|
||||||
|
{
|
||||||
|
TotalCount = await GetTotalCount<T>(options),
|
||||||
|
Items = ParseResults<T>(sessionDataSet, options.PageSize)
|
||||||
|
};
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, $"{nameof(QueryAsync)} 查询数据时发生异常");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -160,12 +192,12 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
}
|
}
|
||||||
|
|
||||||
string tableNameOrTreePath = string.Empty;
|
string tableNameOrTreePath = string.Empty;
|
||||||
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
|
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
|
||||||
if (tableNameOrTreePathAttribute != null)
|
if (tableNameOrTreePathAttribute != null)
|
||||||
{
|
{
|
||||||
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
|
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach (var entity in entities)
|
foreach (var entity in entities)
|
||||||
{
|
{
|
||||||
timestamps.Add(entity.Timestamps);
|
timestamps.Add(entity.Timestamps);
|
||||||
@ -214,7 +246,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
||||||
rowValues.Add(value);
|
rowValues.Add(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -237,7 +269,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
|
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (devicePaths.Count > 1)
|
if (devicePaths.Count > 1)
|
||||||
@ -258,8 +290,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <param name="values">数据集合</param>
|
/// <param name="values">数据集合</param>
|
||||||
/// <param name="timestamps">时间戳集合</param>
|
/// <param name="timestamps">时间戳集合</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath,
|
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List<List<object>> values, List<long> timestamps)
|
||||||
List<List<object>> values, List<long> timestamps)
|
|
||||||
{
|
{
|
||||||
//todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可
|
//todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可
|
||||||
|
|
||||||
@ -280,8 +311,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
/// <param name="values">数据集合</param>
|
/// <param name="values">数据集合</param>
|
||||||
/// <param name="timestamps">时间戳集合</param>
|
/// <param name="timestamps">时间戳集合</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName,
|
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List<List<object>> values, List<long> timestamps)
|
||||||
List<List<object>> values, List<long> timestamps)
|
|
||||||
{
|
{
|
||||||
var tablet = new Tablet(
|
var tablet = new Tablet(
|
||||||
tableName,
|
tableName,
|
||||||
@ -639,7 +669,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
["DOUBLE"] = TSDataType.DOUBLE,
|
["DOUBLE"] = TSDataType.DOUBLE,
|
||||||
["TEXT"] = TSDataType.TEXT,
|
["TEXT"] = TSDataType.TEXT,
|
||||||
["NULLTYPE"] = TSDataType.NONE,
|
["NULLTYPE"] = TSDataType.NONE,
|
||||||
["TIMESTAMP"] = TSDataType.TIMESTAMP,
|
["DATETIME"] = TSDataType.TIMESTAMP,
|
||||||
["DATE"] = TSDataType.DATE,
|
["DATE"] = TSDataType.DATE,
|
||||||
["BLOB"] = TSDataType.BLOB,
|
["BLOB"] = TSDataType.BLOB,
|
||||||
["DECIMAL"] = TSDataType.STRING,
|
["DECIMAL"] = TSDataType.STRING,
|
||||||
@ -659,7 +689,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
|
|||||||
["DOUBLE"] = 0.0d,
|
["DOUBLE"] = 0.0d,
|
||||||
["TEXT"] = string.Empty,
|
["TEXT"] = string.Empty,
|
||||||
["NULLTYPE"] = null,
|
["NULLTYPE"] = null,
|
||||||
["TIMESTAMP"] = null,
|
["DATETIME"] = null,
|
||||||
["DATE"] = null,
|
["DATE"] = null,
|
||||||
["BLOB"] = null,
|
["BLOB"] = null,
|
||||||
["DECIMAL"] = "0.0",
|
["DECIMAL"] = "0.0",
|
||||||
|
|||||||
@ -57,5 +57,5 @@ public class KafkaOptionConfig
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 首次采集时间
|
/// 首次采集时间
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public DateTime FirstCollectionTime { get; set; }
|
public DateTime? FirstCollectionTime { get; set; }
|
||||||
}
|
}
|
||||||
@ -30,12 +30,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
{
|
{
|
||||||
private readonly ILogger<SampleAppService> _logger;
|
private readonly ILogger<SampleAppService> _logger;
|
||||||
private readonly IIoTDbProvider _iotDBProvider;
|
private readonly IIoTDbProvider _iotDBProvider;
|
||||||
private readonly IoTDbRuntimeContext _dbContext;
|
private readonly IoTDBRuntimeContext _dbContext;
|
||||||
private readonly IoTDbOptions _options;
|
private readonly IoTDbOptions _options;
|
||||||
private readonly IRedisDataCacheService _redisDataCacheService;
|
private readonly IRedisDataCacheService _redisDataCacheService;
|
||||||
|
|
||||||
public SampleAppService(IIoTDbProvider iotDBProvider, IOptions<IoTDbOptions> options,
|
public SampleAppService(IIoTDbProvider iotDBProvider, IOptions<IoTDbOptions> options,
|
||||||
IoTDbRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService)
|
IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService)
|
||||||
{
|
{
|
||||||
_iotDBProvider = iotDBProvider;
|
_iotDBProvider = iotDBProvider;
|
||||||
_options = options.Value;
|
_options = options.Value;
|
||||||
@ -60,7 +60,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
Current = 10,
|
Current = 10,
|
||||||
MeterModel = "DDZY-1980",
|
MeterModel = "DDZY-1980",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Voltage = 10,
|
Voltage = 10,
|
||||||
IssuedMessageHexString = "messageHexString",
|
IssuedMessageHexString = "messageHexString",
|
||||||
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
@ -83,7 +83,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
Current = 10,
|
Current = 10,
|
||||||
MeterModel = "DDZY-1980",
|
MeterModel = "DDZY-1980",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Voltage = 10,
|
Voltage = 10,
|
||||||
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
};
|
};
|
||||||
@ -99,7 +99,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
Current = 10,
|
Current = 10,
|
||||||
MeterModel = "DDZY-1980",
|
MeterModel = "DDZY-1980",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Voltage = 10,
|
Voltage = 10,
|
||||||
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
};
|
};
|
||||||
@ -122,7 +122,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
Current = 10,
|
Current = 10,
|
||||||
MeterModel = "DDZY-1980",
|
MeterModel = "DDZY-1980",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Voltage = 10,
|
Voltage = 10,
|
||||||
IssuedMessageHexString = "dsdfsfd",
|
IssuedMessageHexString = "dsdfsfd",
|
||||||
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
@ -141,7 +141,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
Current = 10,
|
Current = 10,
|
||||||
MeterModel = "DDZY-1980",
|
MeterModel = "DDZY-1980",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Voltage = 10,
|
Voltage = 10,
|
||||||
Currentd = 22,
|
Currentd = 22,
|
||||||
IssuedMessageHexString = "dsdfsfd",
|
IssuedMessageHexString = "dsdfsfd",
|
||||||
@ -163,7 +163,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
SystemName = "energy",
|
SystemName = "energy",
|
||||||
DeviceId = "402440506",
|
DeviceId = "402440506",
|
||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
SingleMeasuring = new Tuple<string, string>(measuring, value)
|
SingleMeasuring = new Tuple<string, string>(measuring, value)
|
||||||
};
|
};
|
||||||
@ -183,7 +183,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
SystemName = "energy",
|
SystemName = "energy",
|
||||||
DeviceId = "402440506",
|
DeviceId = "402440506",
|
||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
SingleMeasuring = new Tuple<string, int>(measuring, value)
|
SingleMeasuring = new Tuple<string, int>(measuring, value)
|
||||||
};
|
};
|
||||||
@ -203,7 +203,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
SystemName = "energy",
|
SystemName = "energy",
|
||||||
DeviceId = "402440506",
|
DeviceId = "402440506",
|
||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
SingleColumn = new Tuple<string, string>(measuring, value)
|
SingleColumn = new Tuple<string, string>(measuring, value)
|
||||||
};
|
};
|
||||||
@ -224,7 +224,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
SystemName = "energy",
|
SystemName = "energy",
|
||||||
DeviceId = "402440506",
|
DeviceId = "402440506",
|
||||||
DeviceType = "Ammeter",
|
DeviceType = "Ammeter",
|
||||||
ProjectCode = "10059",
|
ProjectId = "10059",
|
||||||
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
SingleColumn = new Tuple<string, int>(measuring, value)
|
SingleColumn = new Tuple<string, int>(measuring, value)
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1,4 +1,6 @@
|
|||||||
using DnsClient.Protocol;
|
using Confluent.Kafka;
|
||||||
|
using DnsClient.Protocol;
|
||||||
|
using FreeSql;
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.Application.Contracts;
|
using JiShe.CollectBus.Application.Contracts;
|
||||||
using JiShe.CollectBus.Common.BuildSendDatas;
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
||||||
@ -9,6 +11,7 @@ using JiShe.CollectBus.Common.Extensions;
|
|||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
@ -19,6 +22,8 @@ using JiShe.CollectBus.Protocol.Contracts;
|
|||||||
using JiShe.CollectBus.RedisDataCache;
|
using JiShe.CollectBus.RedisDataCache;
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
using Mapster;
|
using Mapster;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System;
|
using System;
|
||||||
@ -38,22 +43,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
||||||
private readonly IIoTDbProvider _dbProvider;
|
private readonly IIoTDbProvider _dbProvider;
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
|
|
||||||
private readonly IProducerService _producerService;
|
private readonly IProducerService _producerService;
|
||||||
private readonly IRedisDataCacheService _redisDataCacheService;
|
private readonly IRedisDataCacheService _redisDataCacheService;
|
||||||
private readonly KafkaOptionConfig _kafkaOptions;
|
private readonly KafkaOptionConfig _kafkaOptions;
|
||||||
|
private readonly IoTDBRuntimeContext _runtimeContext;
|
||||||
|
|
||||||
public BasicScheduledMeterReadingService(
|
public BasicScheduledMeterReadingService(
|
||||||
ILogger<BasicScheduledMeterReadingService> logger,
|
ILogger<BasicScheduledMeterReadingService> logger,
|
||||||
IMeterReadingRecordRepository meterReadingRecordRepository,
|
|
||||||
IProducerService producerService,
|
IProducerService producerService,
|
||||||
IRedisDataCacheService redisDataCacheService,
|
IRedisDataCacheService redisDataCacheService,
|
||||||
IIoTDbProvider dbProvider,
|
IIoTDbProvider dbProvider,
|
||||||
|
IoTDBRuntimeContext runtimeContext,
|
||||||
IOptions<KafkaOptionConfig> kafkaOptions)
|
IOptions<KafkaOptionConfig> kafkaOptions)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_dbProvider = dbProvider;
|
_dbProvider = dbProvider;
|
||||||
_meterReadingRecordRepository = meterReadingRecordRepository;
|
_runtimeContext = runtimeContext;
|
||||||
_producerService = producerService;
|
_producerService = producerService;
|
||||||
_redisDataCacheService = redisDataCacheService;
|
_redisDataCacheService = redisDataCacheService;
|
||||||
_kafkaOptions = kafkaOptions.Value;
|
_kafkaOptions = kafkaOptions.Value;
|
||||||
@ -133,17 +138,49 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
||||||
{
|
{
|
||||||
//_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}");
|
List<MeterReadingTelemetryPacketInfo> pushTaskInfos = new();
|
||||||
|
|
||||||
_ = CreateMeterPublishTask<AmmeterInfo>(
|
await CreateMeterPublishTask<AmmeterInfo>(
|
||||||
timeDensity: timeDensity,
|
timeDensity: timeDensity,
|
||||||
taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
|
nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity),
|
||||||
meterType: MeterTypeEnum.Ammeter,
|
meterType: MeterTypeEnum.Ammeter,
|
||||||
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
|
taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
|
||||||
{
|
{
|
||||||
AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
|
var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
|
||||||
|
if (tempTask == null || tempTask.Count <= 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pushTaskInfos.AddRange(tempTask);
|
||||||
|
|
||||||
|
//using (var score = _serviceProvider.CreateScope())
|
||||||
|
//{
|
||||||
|
// var _dbContext = score.ServiceProvider.GetRequiredService<IoTDBRuntimeContext>();
|
||||||
|
// _dbContext.UseTableSessionPool = true;
|
||||||
|
// _dbProvider.BatchInsertAsync(tempTask);
|
||||||
|
//}
|
||||||
|
|
||||||
|
_runtimeContext.UseTableSessionPool = true;
|
||||||
|
_dbProvider.BatchInsertAsync(tempTask);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
//if (pushTaskInfos.Count <= 0)
|
||||||
|
//{
|
||||||
|
// _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有任务数据信息,-1051");
|
||||||
|
// continue;
|
||||||
|
//}
|
||||||
|
|
||||||
|
//using (var score = _serviceProvider.CreateScope())
|
||||||
|
//{
|
||||||
|
// var _dbContext = score.ServiceProvider.GetRequiredService<IoTDBRuntimeContext>();
|
||||||
|
// _dbContext.UseTableSessionPool = true;
|
||||||
|
// _dbProvider.BatchInsertAsync(pushTaskInfos);
|
||||||
|
//}
|
||||||
|
|
||||||
|
//_dbContext.UseTableSessionPool = true;
|
||||||
|
//await _dbProvider.BatchInsertAsync(pushTaskInfos);
|
||||||
|
|
||||||
}
|
}
|
||||||
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
|
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
|
||||||
{
|
{
|
||||||
@ -152,7 +189,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
_ = CreateMeterPublishTask<WatermeterInfo>(
|
_ = CreateMeterPublishTask<WatermeterInfo>(
|
||||||
timeDensity: timeDensity,
|
timeDensity: timeDensity,
|
||||||
taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
|
nextTaskTime: tasksToBeIssueModel.NextTaskTime,
|
||||||
meterType: MeterTypeEnum.Ammeter,
|
meterType: MeterTypeEnum.Ammeter,
|
||||||
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
|
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
|
||||||
{
|
{
|
||||||
@ -193,11 +230,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
|
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
|
||||||
{
|
{
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
return;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
var timeDensity = "15";
|
var timeDensity = "15";
|
||||||
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
@ -237,8 +270,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
timer1.Stop();
|
timer1.Stop();
|
||||||
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
||||||
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
|
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
|
||||||
//return;
|
return;
|
||||||
#else
|
#else
|
||||||
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
||||||
#endif
|
#endif
|
||||||
@ -261,13 +294,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//根据采集频率分组,获得采集频率分组
|
//根据采集频率分组,获得采集频率分组
|
||||||
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
||||||
|
|
||||||
|
if (_kafkaOptions.FirstCollectionTime.HasValue == false)
|
||||||
|
{
|
||||||
|
_kafkaOptions.FirstCollectionTime = DateTime.Now;
|
||||||
|
}
|
||||||
//先处理采集频率任务缓存
|
//先处理采集频率任务缓存
|
||||||
foreach (var item in meterInfoGroupByTimeDensity)
|
foreach (var item in meterInfoGroupByTimeDensity)
|
||||||
{
|
{
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
TimeDensity = item.Key,
|
TimeDensity = item.Key,
|
||||||
NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
|
NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
|
||||||
};
|
};
|
||||||
|
|
||||||
//todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
|
//todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
|
||||||
@ -465,60 +502,60 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
///// <summary>
|
||||||
/// 创建电表待发送的任务数据
|
///// 创建电表待发送的任务数据
|
||||||
/// </summary>
|
///// </summary>
|
||||||
/// <param name="timeDensity">采集频率</param>
|
///// <param name="timeDensity">采集频率</param>
|
||||||
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
///// <param name="taskBatch">时间格式的任务批次名称</param>
|
||||||
/// <returns></returns>
|
///// <returns></returns>
|
||||||
private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch)
|
//private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch)
|
||||||
{
|
//{
|
||||||
var timer = Stopwatch.StartNew();
|
// var timer = Stopwatch.StartNew();
|
||||||
|
|
||||||
//获取对应频率中的所有电表信息
|
// //获取对应频率中的所有电表信息
|
||||||
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
// var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
// var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
// var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
|
|
||||||
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
// List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
||||||
decimal? cursor = null;
|
// decimal? cursor = null;
|
||||||
string member = null;
|
// string member = null;
|
||||||
bool hasNext;
|
// bool hasNext;
|
||||||
do
|
// do
|
||||||
{
|
// {
|
||||||
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
|
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
|
||||||
redisCacheMeterInfoHashKeyTemp,
|
// redisCacheMeterInfoHashKeyTemp,
|
||||||
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
||||||
pageSize: 1000,
|
// pageSize: 1000,
|
||||||
lastScore: cursor,
|
// lastScore: cursor,
|
||||||
lastMember: member);
|
// lastMember: member);
|
||||||
|
|
||||||
meterInfos.AddRange(page.Items);
|
// meterInfos.AddRange(page.Items);
|
||||||
cursor = page.HasNext ? page.NextScore : null;
|
// cursor = page.HasNext ? page.NextScore : null;
|
||||||
member = page.HasNext ? page.NextMember : null;
|
// member = page.HasNext ? page.NextMember : null;
|
||||||
hasNext = page.HasNext;
|
// hasNext = page.HasNext;
|
||||||
} while (hasNext);
|
// } while (hasNext);
|
||||||
|
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
// if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
// {
|
||||||
timer.Stop();
|
// timer.Stop();
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
// _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
|
||||||
|
|
||||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
// await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||||
items: meterInfos,
|
// items: meterInfos,
|
||||||
deviceIdSelector: data => data.FocusAddress,
|
// deviceIdSelector: data => data.FocusAddress,
|
||||||
processor: (data, groupIndex) =>
|
// processor: (data, groupIndex) =>
|
||||||
{
|
// {
|
||||||
AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
|
// AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
|
||||||
}
|
// }
|
||||||
);
|
// );
|
||||||
|
|
||||||
timer.Stop();
|
// timer.Stop();
|
||||||
_logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
|
// _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
|
||||||
}
|
//}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -527,38 +564,33 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="timeDensity">采集频率</param>
|
/// <param name="timeDensity">采集频率</param>
|
||||||
/// <param name="ammeterInfo">电表信息</param>
|
/// <param name="ammeterInfo">电表信息</param>
|
||||||
/// <param name="groupIndex">集中器所在分组</param>
|
/// <param name="groupIndex">集中器所在分组</param>
|
||||||
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
/// <param name="timestamps">采集频率对应的时间戳</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private void AmmerterCreatePublishTaskAction(int timeDensity
|
private List<MeterReadingTelemetryPacketInfo> AmmerterCreatePublishTaskAction(int timeDensity
|
||||||
, AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
|
, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
|
||||||
{
|
{
|
||||||
|
var currentTime = DateTime.Now;
|
||||||
|
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
||||||
|
|
||||||
var currentTime = DateTime.Now;
|
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
|
||||||
|
|
||||||
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
|
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
|
||||||
{
|
{
|
||||||
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
|
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
//载波的不处理
|
//载波的不处理
|
||||||
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
|
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
|
||||||
{
|
{
|
||||||
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
|
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ammeterInfo.State.Equals(2))
|
if (ammeterInfo.State.Equals(2))
|
||||||
{
|
{
|
||||||
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
|
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
|
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
|
||||||
@ -571,22 +603,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
|
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
|
||||||
{
|
{
|
||||||
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
|
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
|
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
|
||||||
{
|
{
|
||||||
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
|
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
|
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
|
||||||
{
|
{
|
||||||
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
|
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
|
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
|
||||||
{
|
{
|
||||||
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
|
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
|
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
|
||||||
@ -613,7 +645,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
if (tempSubCodes == null || tempSubCodes.Count <= 0)
|
if (tempSubCodes == null || tempSubCodes.Count <= 0)
|
||||||
{
|
{
|
||||||
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
|
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -683,18 +715,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
||||||
{
|
{
|
||||||
ProjectID = ammeterInfo.ProjectID,
|
SystemName = SystemType,
|
||||||
|
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||||
|
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||||
|
DeviceId = $"{ammeterInfo.MemberId}",
|
||||||
|
Timestamps = timestamps.GetDateTimeOffset().ToUnixTimeMilliseconds(),
|
||||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||||
PendingCopyReadTime = pendingCopyReadTime,
|
PendingCopyReadTime = timestamps,
|
||||||
CreationTime = currentTime,
|
CreationTime = currentTime,
|
||||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||||
MeterId = ammeterInfo.MeterId,
|
|
||||||
MeterType = MeterTypeEnum.Ammeter,
|
|
||||||
FocusAddress = ammeterInfo.FocusAddress,
|
FocusAddress = ammeterInfo.FocusAddress,
|
||||||
FocusId = ammeterInfo.FocusId,
|
AFN = (int)aFN,
|
||||||
AFN = aFN,
|
|
||||||
Fn = fn,
|
Fn = fn,
|
||||||
Seq = builderResponse.Seq,
|
//Seq = builderResponse.Seq,
|
||||||
MSA = builderResponse.MSA,
|
MSA = builderResponse.MSA,
|
||||||
ItemCode = tempItem,
|
ItemCode = tempItem,
|
||||||
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA),
|
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA),
|
||||||
@ -709,37 +742,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
taskList.Add(meterReadingRecords);
|
taskList.Add(meterReadingRecords);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskList == null
|
return taskList;
|
||||||
|| taskList.Count() <= 0
|
|
||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|
|
||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|
|
||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
using (var pipe = FreeRedisProvider.Instance.StartPipe())
|
//using (var score = _serviceProvider.CreateScope())
|
||||||
{
|
//{
|
||||||
foreach (var item in taskList)
|
// var _dbContext = score.ServiceProvider.GetRequiredService<IoTDBRuntimeContext>();
|
||||||
{
|
// _dbContext.UseTableSessionPool = true;
|
||||||
// 主数据存储Hash
|
// _dbProvider.BatchInsertAsync(taskList);
|
||||||
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize());
|
//}
|
||||||
|
|
||||||
// Set索引缓存
|
|
||||||
pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId);
|
|
||||||
|
|
||||||
// ZSET索引缓存Key
|
|
||||||
pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId);
|
|
||||||
}
|
|
||||||
pipe.EndPipe();
|
|
||||||
}
|
|
||||||
|
|
||||||
//await _redisDataCacheService.BatchInsertDataAsync(
|
|
||||||
// redisCacheTelemetryPacketInfoHashKey,
|
|
||||||
// redisCacheTelemetryPacketInfoSetIndexKey,
|
|
||||||
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
|
||||||
// taskList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
@ -864,7 +876,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
|
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
|
||||||
}
|
}
|
||||||
|
|
||||||
////删除任务数据
|
////删除任务数据
|
||||||
@ -877,52 +889,52 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
|
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
///// <summary>
|
||||||
/// 创建水表待发送的任务数据
|
///// 创建水表待发送的任务数据
|
||||||
/// </summary>
|
///// </summary>
|
||||||
/// <param name="timeDensity">采集频率</param>
|
///// <param name="timeDensity">采集频率</param>
|
||||||
/// <param name="meterInfo">水表信息</param>
|
///// <param name="meterInfo">水表信息</param>
|
||||||
/// <param name="groupIndex">集中器所在分组</param>
|
///// <param name="groupIndex">集中器所在分组</param>
|
||||||
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
///// <param name="taskBatch">时间格式的任务批次名称</param>
|
||||||
/// <returns></returns>
|
///// <returns></returns>
|
||||||
private void WatermeterCreatePublishTaskAction(int timeDensity
|
//private void WatermeterCreatePublishTaskAction(int timeDensity
|
||||||
, WatermeterInfo meterInfo, int groupIndex, string taskBatch)
|
// , WatermeterInfo meterInfo, int groupIndex, string taskBatch)
|
||||||
{
|
//{
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
// var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
|
|
||||||
|
|
||||||
var currentTime = DateTime.Now;
|
// var currentTime = DateTime.Now;
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
// var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
||||||
|
|
||||||
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
// var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
||||||
|
|
||||||
|
|
||||||
var taskInfo = new MeterReadingTelemetryPacketInfo()
|
// var taskInfo = new MeterReadingTelemetryPacketInfo()
|
||||||
{
|
// {
|
||||||
Seq= null,
|
// Seq= null,
|
||||||
|
|
||||||
};
|
|
||||||
//
|
|
||||||
|
|
||||||
Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address);
|
// };
|
||||||
|
// //
|
||||||
|
|
||||||
using (var pipe = FreeRedisProvider.Instance.StartPipe())
|
// Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address);
|
||||||
{
|
|
||||||
// 主数据存储Hash
|
|
||||||
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize());
|
|
||||||
|
|
||||||
// Set索引缓存
|
// using (var pipe = FreeRedisProvider.Instance.StartPipe())
|
||||||
pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId);
|
// {
|
||||||
|
// // 主数据存储Hash
|
||||||
|
// pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize());
|
||||||
|
|
||||||
// ZSET索引缓存Key
|
// // Set索引缓存
|
||||||
pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId);
|
// pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId);
|
||||||
|
|
||||||
pipe.EndPipe();
|
// // ZSET索引缓存Key
|
||||||
}
|
// pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId);
|
||||||
|
|
||||||
}
|
// pipe.EndPipe();
|
||||||
|
// }
|
||||||
|
|
||||||
|
//}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
@ -961,11 +973,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// 创建表的待发送的任务数据
|
/// 创建表的待发送的任务数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="timeDensity">采集频率</param>
|
/// <param name="timeDensity">采集频率</param>
|
||||||
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
/// <param name="nextTaskTime">采集频率对应的任务时间戳</param>
|
||||||
/// <param name="meterType">表类型</param>
|
/// <param name="meterType">表类型</param>
|
||||||
/// <param name="taskCreateAction">具体的创建任务的委托</param>
|
/// <param name="taskCreateAction">具体的创建任务的委托</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task CreateMeterPublishTask<T>(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action<int, T, int, string> taskCreateAction) where T : DeviceCacheBasicModel
|
private async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
var timer = Stopwatch.StartNew();
|
var timer = Stopwatch.StartNew();
|
||||||
|
|
||||||
@ -978,20 +990,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
decimal? cursor = null;
|
decimal? cursor = null;
|
||||||
string member = null;
|
string member = null;
|
||||||
bool hasNext;
|
bool hasNext;
|
||||||
do
|
//do
|
||||||
{
|
//{
|
||||||
var page = await _redisDataCacheService.GetAllPagedData<T>(
|
// var page = await _redisDataCacheService.GetAllPagedData<T>(
|
||||||
redisCacheMeterInfoHashKeyTemp,
|
// redisCacheMeterInfoHashKeyTemp,
|
||||||
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
||||||
pageSize: 1000,
|
// pageSize: 1000,
|
||||||
lastScore: cursor,
|
// lastScore: cursor,
|
||||||
lastMember: member);
|
// lastMember: member);
|
||||||
|
|
||||||
meterInfos.AddRange(page.Items);
|
// meterInfos.AddRange(page.Items);
|
||||||
cursor = page.HasNext ? page.NextScore : null;
|
// cursor = page.HasNext ? page.NextScore : null;
|
||||||
member = page.HasNext ? page.NextMember : null;
|
// member = page.HasNext ? page.NextMember : null;
|
||||||
hasNext = page.HasNext;
|
// hasNext = page.HasNext;
|
||||||
} while (hasNext);
|
//} while (hasNext);
|
||||||
|
|
||||||
|
|
||||||
|
var page = await _redisDataCacheService.GetAllPagedData<T>(
|
||||||
|
redisCacheMeterInfoHashKeyTemp,
|
||||||
|
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
||||||
|
pageSize: 1,
|
||||||
|
lastScore: cursor,
|
||||||
|
lastMember: member);
|
||||||
|
meterInfos.AddRange(page.Items);
|
||||||
|
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
@ -1000,13 +1021,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||||
items: meterInfos,
|
items: meterInfos,
|
||||||
deviceIdSelector: data => data.FocusAddress,
|
deviceIdSelector: data => data.FocusAddress,
|
||||||
processor: (data, groupIndex) =>
|
processor: (data, groupIndex) =>
|
||||||
{
|
{
|
||||||
taskCreateAction(timeDensity, data, groupIndex, taskBatch);
|
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -1034,29 +1054,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
string member = null;
|
string member = null;
|
||||||
bool hasNext;
|
bool hasNext;
|
||||||
var stopwatch = Stopwatch.StartNew();
|
var stopwatch = Stopwatch.StartNew();
|
||||||
do
|
//do
|
||||||
{
|
//{
|
||||||
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
|
// var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
|
||||||
redisCacheTelemetryPacketInfoHashKey,
|
// redisCacheTelemetryPacketInfoHashKey,
|
||||||
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
||||||
pageSize: 1000,
|
// pageSize: 1000,
|
||||||
lastScore: cursor,
|
// lastScore: cursor,
|
||||||
lastMember: member);
|
// lastMember: member);
|
||||||
|
|
||||||
cursor = page.HasNext ? page.NextScore : null;
|
// cursor = page.HasNext ? page.NextScore : null;
|
||||||
member = page.HasNext ? page.NextMember : null;
|
// member = page.HasNext ? page.NextMember : null;
|
||||||
hasNext = page.HasNext;
|
// hasNext = page.HasNext;
|
||||||
|
|
||||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
// await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||||
items: page.Items,
|
// items: page.Items,
|
||||||
deviceIdSelector: data => data.FocusAddress,
|
// deviceIdSelector: data => data.FocusAddress,
|
||||||
processor: (data, groupIndex) =>
|
// processor: (data, groupIndex) =>
|
||||||
{
|
// {
|
||||||
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
|
// _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
|
||||||
}
|
// }
|
||||||
);
|
// );
|
||||||
|
|
||||||
} while (hasNext);
|
//} while (hasNext);
|
||||||
|
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
||||||
|
|||||||
@ -8,6 +8,7 @@ using JiShe.CollectBus.Common.DeviceBalanceControl;
|
|||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
@ -35,18 +36,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
||||||
{
|
{
|
||||||
string serverTagName = string.Empty;
|
string serverTagName = string.Empty;
|
||||||
|
|
||||||
public EnergySystemScheduledMeterReadingService(
|
public EnergySystemScheduledMeterReadingService(
|
||||||
ILogger<EnergySystemScheduledMeterReadingService> logger,
|
ILogger<EnergySystemScheduledMeterReadingService> logger,
|
||||||
IIoTDbProvider dbProvider,
|
IIoTDbProvider dbProvider,
|
||||||
IMeterReadingRecordRepository meterReadingRecordRepository,
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
||||||
IOptions<KafkaOptionConfig> kafkaOptions,
|
IoTDBRuntimeContext runtimeContext,
|
||||||
IProducerService producerService,
|
IProducerService producerService,
|
||||||
IRedisDataCacheService redisDataCacheService)
|
IRedisDataCacheService redisDataCacheService)
|
||||||
: base(logger,
|
: base(logger,
|
||||||
meterReadingRecordRepository,
|
|
||||||
producerService,
|
producerService,
|
||||||
redisDataCacheService,
|
redisDataCacheService,
|
||||||
dbProvider,
|
dbProvider,
|
||||||
|
runtimeContext,
|
||||||
kafkaOptions)
|
kafkaOptions)
|
||||||
{
|
{
|
||||||
serverTagName = kafkaOptions.Value.ServerTagName;
|
serverTagName = kafkaOptions.Value.ServerTagName;
|
||||||
|
|||||||
@ -1,5 +1,9 @@
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Encrypt;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.IoTDB.Attribute;
|
||||||
|
using JiShe.CollectBus.IoTDB.Enums;
|
||||||
|
using JiShe.CollectBus.IoTDB.Model;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
@ -13,78 +17,85 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 抄读任务Redis缓存数据记录
|
/// 抄读任务Redis缓存数据记录
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel
|
[EntityType(EntityTypeEnum.TableModel)]
|
||||||
|
public class MeterReadingTelemetryPacketInfo : IoTEntity
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
|
/// 排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳、或者某一个固定的标识1
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}";
|
[FIELDColumn]
|
||||||
|
public string ScoreValue
|
||||||
/// <summary>
|
{
|
||||||
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
|
get
|
||||||
/// </summary>
|
{
|
||||||
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
|
return $"{FocusAddress}.{TaskMark}".Md5Fun();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 是否手动操作
|
/// 是否手动操作
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public bool ManualOrNot { get; set; }
|
public bool ManualOrNot { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 任务数据唯一标记
|
/// 任务数据唯一标记
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public decimal TaskMark { get; set; }
|
[FIELDColumn]
|
||||||
|
public string TaskMark { get; set; }
|
||||||
/// <summary>
|
|
||||||
/// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。
|
|
||||||
/// </summary>
|
|
||||||
public long Timestamps { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 是否超时
|
/// 是否超时
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public bool IsTimeout { get; set; } = false;
|
public bool IsTimeout { get; set; } = false;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 待抄读时间
|
/// 待抄读时间
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public DateTime PendingCopyReadTime { get; set; }
|
public DateTime PendingCopyReadTime { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 集中器Id
|
||||||
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
|
public int FocusId { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表Id
|
||||||
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
|
public int MeterId { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 集中器地址
|
/// 集中器地址
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public string FocusAddress { get; set; }
|
public string FocusAddress { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 表地址
|
/// 表地址
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public string MeterAddress { get; set; }
|
public string MeterAddress { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 表类型
|
|
||||||
/// </summary>
|
|
||||||
public MeterTypeEnum MeterType { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 项目ID
|
|
||||||
/// </summary>
|
|
||||||
public int ProjectID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 数据库业务ID
|
/// 数据库业务ID
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public int DatabaseBusiID { get; set; }
|
public int DatabaseBusiID { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// AFN功能码
|
/// AFN功能码
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public AFN AFN { get; set; }
|
[FIELDColumn]
|
||||||
|
public int AFN { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 抄读功能码
|
/// 抄读功能码
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public int Fn { get; set; }
|
public int Fn { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -95,66 +106,73 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 采集项编码
|
/// 采集项编码
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string ItemCode { get; set;}
|
[FIELDColumn]
|
||||||
|
public string ItemCode { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
///// <summary>
|
||||||
/// 帧序列域SEQ
|
///// 帧序列域SEQ
|
||||||
/// </summary>
|
///// </summary>
|
||||||
public required Seq Seq { get; set; }
|
//public required Seq Seq { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 地址域A3的主站地址MSA
|
/// 地址域A3的主站地址MSA
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public int MSA { get; set; }
|
public int MSA { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 是否发送
|
/// 是否发送
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public bool IsSend { get; set; }
|
public bool IsSend { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 创建时间
|
/// 创建时间
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public DateTime CreationTime { get; set; }
|
public DateTime CreationTime { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 下发消息内容
|
/// 下发消息内容
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public string IssuedMessageHexString { get; set; }
|
public string IssuedMessageHexString { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 下发消息Id
|
/// 下发消息Id
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public string IssuedMessageId { get; set; }
|
public string IssuedMessageId { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 消息上报内容
|
/// 消息上报内容
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public string? ReceivedMessageHexString { get; set; }
|
public string? ReceivedMessageHexString { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 消息上报时间
|
/// 消息上报时间
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public DateTime? ReceivedTime { get; set; }
|
public DateTime? ReceivedTime { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 上报消息Id
|
/// 上报消息Id
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string ReceivedMessageId { get; set; }
|
[FIELDColumn]
|
||||||
|
public string ReceivedMessageId { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 上报报文解析备注,异常情况下才有
|
/// 上报报文解析备注,异常情况下才有
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public string ReceivedRemark { get; set; }
|
public string ReceivedRemark { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 是否已上报
|
/// 是否已上报
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
public bool IsReceived { get; set; }
|
public bool IsReceived { get; set; }
|
||||||
|
|
||||||
//public void CreateDataId(Guid Id)
|
|
||||||
//{
|
|
||||||
// this.Id = Id;
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
65
shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs
Normal file
65
shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Security.Cryptography;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Common.Encrypt
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 各种加密辅助类
|
||||||
|
/// </summary>
|
||||||
|
public static class EncryptUtil
|
||||||
|
{
|
||||||
|
#region MD5加密
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// MD5加密
|
||||||
|
/// </summary>
|
||||||
|
public static string Md5Fun(this string value)
|
||||||
|
{
|
||||||
|
if (value == null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException("未将对象引用设置到对象的实例。");
|
||||||
|
}
|
||||||
|
|
||||||
|
var encoding = Encoding.UTF8;
|
||||||
|
MD5 md5 = MD5.Create();
|
||||||
|
return HashAlgorithmBase(md5, value, encoding);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 加权MD5加密
|
||||||
|
/// </summary>
|
||||||
|
public static string Md5Fun(this string value, string salt)
|
||||||
|
{
|
||||||
|
return salt == null ? value.Md5Fun() : (value + "『" + salt + "』").Md5Fun();
|
||||||
|
}
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// HashAlgorithm 加密统一方法
|
||||||
|
/// </summary>
|
||||||
|
private static string HashAlgorithmBase(HashAlgorithm hashAlgorithmObj, string source, Encoding encoding)
|
||||||
|
{
|
||||||
|
byte[] btStr = encoding.GetBytes(source);
|
||||||
|
byte[] hashStr = hashAlgorithmObj.ComputeHash(btStr);
|
||||||
|
return hashStr.Bytes2Str();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 转换成字符串
|
||||||
|
/// </summary>
|
||||||
|
private static string Bytes2Str(this IEnumerable<byte> source, string formatStr = "{0:X2}")
|
||||||
|
{
|
||||||
|
StringBuilder pwd = new StringBuilder();
|
||||||
|
foreach (byte btStr in source)
|
||||||
|
{
|
||||||
|
pwd.AppendFormat(formatStr, btStr);
|
||||||
|
}
|
||||||
|
return pwd.ToString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -181,25 +181,7 @@ namespace JiShe.CollectBus.Common.Extensions
|
|||||||
return $"{dateTime:yyyyMMddHH}";
|
return $"{dateTime:yyyyMMddHH}";
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取当前时间毫秒级时间戳
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static long GetCurrentTimeMillis()
|
|
||||||
{
|
|
||||||
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 将Unix时间戳转换为日期时间
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="millis"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static DateTime FromUnixMillis(long millis)
|
|
||||||
{
|
|
||||||
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 采集时间节点计算
|
/// 采集时间节点计算
|
||||||
@ -233,22 +215,6 @@ namespace JiShe.CollectBus.Common.Extensions
|
|||||||
.AddHours(hours)
|
.AddHours(hours)
|
||||||
.AddMinutes(minutes);
|
.AddMinutes(minutes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 将 DateTime 时间转换为 DateTimeOffset 时间
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="rawDateTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime)
|
|
||||||
{
|
|
||||||
//确保 Kind 为 Local(如果是 Unspecified)
|
|
||||||
DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified
|
|
||||||
? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local)
|
|
||||||
: rawDateTime;
|
|
||||||
|
|
||||||
// 转换为 DateTimeOffset(自动应用本地时区偏移)
|
|
||||||
return new DateTimeOffset(localDateTime);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,101 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.ComponentModel;
|
||||||
|
using System.Linq;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Common.Extensions
|
||||||
|
{
|
||||||
|
public static class DateTimeOffsetExtensions
|
||||||
|
{
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取当前时间毫秒级时间戳
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static long GetCurrentTimeMillis()
|
||||||
|
{
|
||||||
|
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 将Unix时间戳转换为日期时间
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="millis"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static DateTime FromUnixMillis(long millis)
|
||||||
|
{
|
||||||
|
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 采集时间节点计算
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="referenceTime">待采集时间</param>
|
||||||
|
/// <param name="interval"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval)
|
||||||
|
{
|
||||||
|
// 计算精确到分钟的基准时间
|
||||||
|
var baseTime = new DateTime(
|
||||||
|
referenceTime.Year,
|
||||||
|
referenceTime.Month,
|
||||||
|
referenceTime.Day,
|
||||||
|
referenceTime.Hour,
|
||||||
|
referenceTime.Minute,
|
||||||
|
0);
|
||||||
|
|
||||||
|
// 计算总分钟数和下一个间隔点
|
||||||
|
int totalMinutes = baseTime.Hour * 60 + baseTime.Minute;
|
||||||
|
int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval;
|
||||||
|
|
||||||
|
// 处理跨天情况
|
||||||
|
int daysToAdd = nextTotalMinutes / (24 * 60);
|
||||||
|
int remainingMinutes = nextTotalMinutes % (24 * 60);
|
||||||
|
int hours = remainingMinutes / 60;
|
||||||
|
int minutes = remainingMinutes % 60;
|
||||||
|
|
||||||
|
return baseTime.Date
|
||||||
|
.AddDays(daysToAdd)
|
||||||
|
.AddHours(hours)
|
||||||
|
.AddMinutes(minutes);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 将 DateTime 时间转换为 DateTimeOffset 时间
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="rawDateTime"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime)
|
||||||
|
{
|
||||||
|
//确保 Kind 为 Local(如果是 Unspecified)
|
||||||
|
DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified
|
||||||
|
? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local)
|
||||||
|
: rawDateTime;
|
||||||
|
|
||||||
|
// 转换为 DateTimeOffset(自动应用本地时区偏移)
|
||||||
|
return new DateTimeOffset(localDateTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static readonly long UnixEpochTicks = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero).Ticks;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的微秒数
|
||||||
|
/// </summary>
|
||||||
|
public static long ToUnixTimeMicroseconds(this DateTimeOffset dateTimeOffset)
|
||||||
|
{
|
||||||
|
// Ticks 单位是 100 纳秒,转换为微秒需除以 10
|
||||||
|
long elapsedTicks = dateTimeOffset.Ticks - UnixEpochTicks;
|
||||||
|
return elapsedTicks / 10; // 1 微秒 = 1000 纳秒 = 10 Ticks
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的纳秒数
|
||||||
|
/// </summary>
|
||||||
|
public static long ToUnixTimeNanoseconds(this DateTimeOffset dateTimeOffset)
|
||||||
|
{
|
||||||
|
long nanoseconds = (dateTimeOffset.Ticks - UnixEpochTicks) * 100;
|
||||||
|
return nanoseconds;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -769,11 +769,11 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
/// <param name="pn"></param>
|
/// <param name="pn"></param>
|
||||||
/// <param name="msa"></param>
|
/// <param name="msa"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static decimal GetTaskMark(int afn, int fn, int pn, int msa)
|
public static string GetTaskMark(int afn, int fn, int pn, int msa)
|
||||||
{
|
{
|
||||||
var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}";
|
var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}";
|
||||||
|
|
||||||
return Convert.ToInt32(makstr) << 32 | msa;
|
return makstr;// Convert.ToInt32(makstr) << 32 | msa;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,7 +17,7 @@
|
|||||||
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
||||||
<title>后端服务</title>
|
<title>后端服务</title>
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
<div class="container projects">
|
<div class="container projects">
|
||||||
<div class="projects-header page-header">
|
<div class="projects-header page-header">
|
||||||
|
|||||||
@ -84,7 +84,8 @@
|
|||||||
"SaslPassword": "lixiao1980",
|
"SaslPassword": "lixiao1980",
|
||||||
"KafkaReplicationFactor": 3,
|
"KafkaReplicationFactor": 3,
|
||||||
"NumPartitions": 30,
|
"NumPartitions": 30,
|
||||||
"ServerTagName": "JiSheCollectBus20"
|
"ServerTagName": "JiSheCollectBus3",
|
||||||
|
"FirstCollectionTime": "2025-04-21 16:11:00"
|
||||||
},
|
},
|
||||||
"IoTDBOptions": {
|
"IoTDBOptions": {
|
||||||
"UserName": "root",
|
"UserName": "root",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user