Compare commits

..

No commits in common. "c29ec906b14a825371278b1ca40c2ba3081567ba" and "b1a5d29fa79b134a7cb12c81af93722ab9f90042" have entirely different histories.

26 changed files with 428 additions and 818 deletions

View File

@ -16,7 +16,7 @@ public class CollectBusIoTDbModule : AbpModule
var configuration = context.Services.GetConfiguration(); var configuration = context.Services.GetConfiguration();
Configure<IoTDbOptions>(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); }); Configure<IoTDbOptions>(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); });
//// 注册上下文为Scoped // 注册上下文为Scoped
//context.Services.AddScoped<IoTDBRuntimeContext>(); context.Services.AddScoped<IoTDbRuntimeContext>();
} }
} }

View File

@ -1,17 +1,16 @@
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.IoTDB.Context namespace JiShe.CollectBus.IoTDB.Context
{ {
/// <summary> /// <summary>
/// IoTDB SessionPool 运行时上下文 /// IoTDB SessionPool 运行时上下文
/// </summary> /// </summary>
public class IoTDBRuntimeContext: IScopedDependency public class IoTDbRuntimeContext
{ {
private readonly bool _defaultValue; private readonly bool _defaultValue;
public IoTDBRuntimeContext(IOptions<IoTDbOptions> options) public IoTDbRuntimeContext(IOptions<IoTDbOptions> options)
{ {
_defaultValue = options.Value.UseTableSessionPoolByDefault; _defaultValue = options.Value.UseTableSessionPoolByDefault;
UseTableSessionPool = _defaultValue; UseTableSessionPool = _defaultValue;

View File

@ -1,7 +1,6 @@
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
namespace JiShe.CollectBus.IoTDB.Interface namespace JiShe.CollectBus.IoTDB.Interface
{ {
@ -32,15 +31,6 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <returns></returns> /// <returns></returns>
Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity; Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity;
/// <summary>
/// 批量插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="deviceMetadata">设备元数据</param>
/// <param name="entities"></param>
/// <returns></returns>
Task BatchInsertAsync<T>(DeviceMetadata deviceMetadata,IEnumerable<T> entities) where T : IoTEntity;
/// <summary> /// <summary>
/// 删除数据 /// 删除数据
@ -48,14 +38,7 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
Task<object> DeleteAsync<T>(IoTDBQueryOptions options) where T : IoTEntity; Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity;
/// <summary>
/// 获取设备元数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<DeviceMetadata> GetMetadata<T>() where T : IoTEntity;
/// <summary> /// <summary>
/// 查询数据 /// 查询数据
@ -63,6 +46,6 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
Task<BusPagedResult<T>> QueryAsync<T>(IoTDBQueryOptions options) where T : IoTEntity, new(); Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new();
} }
} }

View File

@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 系统名称 /// 系统名称
/// </summary> /// </summary>
[TAGColumn] [TAGColumn]
public string SystemName { get; set; } public required string SystemName { get; set; }
/// <summary> /// <summary>
/// 项目编码 /// 项目编码
/// </summary> /// </summary>
[ATTRIBUTEColumn] [TAGColumn]
public string ProjectId { get; set; } public required string ProjectCode { get; set; }
/// <summary> /// <summary>
/// 设备类型集中器、电表、水表、流量计、传感器等 /// 设备类型集中器、电表、水表、流量计、传感器等
/// </summary> /// </summary>
[ATTRIBUTEColumn] [TAGColumn]
public string DeviceType { get; set; } public required string DeviceType { get; set; }
/// <summary> /// <summary>
/// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址 /// 设备ID
/// </summary> /// </summary>
[TAGColumn] [TAGColumn]
public string DeviceId { get; set; } public required string DeviceId { get; set; }
/// <summary> /// <summary>
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// 当前时间戳,单位毫秒,必须通过DateTimeOffset获取
/// </summary> /// </summary>
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
} }
} }

View File

@ -42,10 +42,5 @@
/// 是否使用表模型存储, 默认false使用tree模型存储 /// 是否使用表模型存储, 默认false使用tree模型存储
/// </summary> /// </summary>
public bool UseTableSessionPoolByDefault { get; set; } = false; public bool UseTableSessionPoolByDefault { get; set; } = false;
/// <summary>
/// 时区,默认为:"UTC+08:00"
/// </summary>
public string ZoneId { get; set; } = "UTC+08:00";
} }
} }

View File

@ -9,17 +9,10 @@
/// 字段 /// 字段
/// </summary> /// </summary>
public string Field { get; set; } public string Field { get; set; }
/// <summary> /// <summary>
/// 操作符,>=< /// 操作符
/// </summary> /// </summary>
public string Operator { get; set; } public string Operator { get; set; }
/// <summary>
/// 是否数值,如果是数值,则进行数值比较,否则进行字符串比较
/// </summary>
public bool IsNumber { get; set; } = false;
/// <summary> /// <summary>
/// 值 /// 值
/// </summary> /// </summary>

View File

@ -3,7 +3,7 @@
/// <summary> /// <summary>
/// 查询条件 /// 查询条件
/// </summary> /// </summary>
public class IoTDBQueryOptions public class QueryOptions
{ {
/// <summary> /// <summary>
/// 表模型的表名称或者树模型的设备路径 /// 表模型的表名称或者树模型的设备路径
@ -13,7 +13,7 @@
/// <summary> /// <summary>
/// 分页 /// 分页
/// </summary> /// </summary>
public int PageIndex { get; set; } public int Page { get; set; }
/// <summary> /// <summary>
/// 分页大小 /// 分页大小
@ -23,6 +23,6 @@
/// <summary> /// <summary>
/// 查询条件 /// 查询条件
/// </summary> /// </summary>
public List<QueryCondition> Conditions { get; set; } = new(); public List<QueryCondition> Conditions { get; } = new();
} }
} }

View File

@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public static string GetDevicePath<T>(T entity) where T : IoTEntity public static string GetDevicePath<T>(T entity) where T : IoTEntity
{ {
return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
} }
@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public static string GetDeviceTableName<T>(T entity) where T : IoTEntity public static string GetDeviceTableName<T>(T entity) where T : IoTEntity
{ {
return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; return $"{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
} }
} }

View File

@ -2,12 +2,8 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Reflection; using System.Reflection;
using System.Text; using System.Text;
using System.Threading.Tasks;
using Apache.IoTDB; using Apache.IoTDB;
using Apache.IoTDB.DataStructure; using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
@ -28,7 +24,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new(); private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
private readonly ILogger<IoTDbProvider> _logger; private readonly ILogger<IoTDbProvider> _logger;
private readonly IIoTDbSessionFactory _sessionFactory; private readonly IIoTDbSessionFactory _sessionFactory;
private readonly IoTDBRuntimeContext _runtimeContext; private readonly IoTDbRuntimeContext _runtimeContext;
private IIoTDbSessionPool CurrentSession => private IIoTDbSessionPool CurrentSession =>
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
@ -42,7 +38,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
public IoTDbProvider( public IoTDbProvider(
ILogger<IoTDbProvider> logger, ILogger<IoTDbProvider> logger,
IIoTDbSessionFactory sessionFactory, IIoTDbSessionFactory sessionFactory,
IoTDBRuntimeContext runtimeContext) IoTDbRuntimeContext runtimeContext)
{ {
_logger = logger; _logger = logger;
_sessionFactory = sessionFactory; _sessionFactory = sessionFactory;
@ -58,19 +54,11 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task InsertAsync<T>(T entity) where T : IoTEntity public async Task InsertAsync<T>(T entity) where T : IoTEntity
{ {
try var metadata = GetMetadata<T>();
{
var metadata = await GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata); var tablet = BuildTablet(new[] { entity }, metadata);
await CurrentSession.InsertAsync(tablet); await CurrentSession.InsertAsync(tablet);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常");
throw;
}
} }
/// <summary> /// <summary>
@ -80,51 +68,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
{ {
try var metadata = GetMetadata<T>();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{ {
var metadata = await GetMetadata<T>(); var tablet = BuildTablet(batch, metadata);
await CurrentSession.InsertAsync(tablet);
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
await CurrentSession.InsertAsync(tablet);
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常");
throw;
}
}
/// <summary>
/// 批量插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="deviceMetadata">设备元数据</param>
/// <param name="entities"></param>
/// <returns></returns>
public async Task BatchInsertAsync<T>(DeviceMetadata deviceMetadata, IEnumerable<T> entities) where T : IoTEntity
{
try
{
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, deviceMetadata);
await CurrentSession.InsertAsync(tablet);
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常");
throw;
} }
} }
@ -135,54 +87,20 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
public async Task<object> DeleteAsync<T>(IoTDBQueryOptions options) where T : IoTEntity public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity
{ {
try var query = BuildDeleteSQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
if (!sessionDataSet.HasNext())
{ {
var query = await BuildDeleteSQL<T>(options); _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。");
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); return 0;
if (!sessionDataSet.HasNext())
{
_logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。");
return 0;
}
//获取唯一结果行
var row = sessionDataSet.Next();
return row.Values[0];
} }
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常");
throw;
}
}
/// <summary> //获取唯一结果行
/// 获取设备元数据 var row = sessionDataSet.Next();
/// </summary> return row.Values[0];
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public async Task<DeviceMetadata> GetMetadata<T>() where T : IoTEntity
{
var columns = CollectColumnMetadata(typeof(T));
var metadata = BuildDeviceMetadata<T>(columns);
var metaData = MetadataCache.AddOrUpdate(
typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) =>
{
var columns = CollectColumnMetadata(t);
var metadata = BuildDeviceMetadata<T>(columns);
//对现有值 existingValue 进行修改,返回新值
existingValue.ColumnNames = metadata.ColumnNames;
return existingValue;
}
);
return await Task.FromResult(metaData);
} }
/// <summary> /// <summary>
@ -191,32 +109,18 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
public async Task<BusPagedResult<T>> QueryAsync<T>(IoTDBQueryOptions options) where T : IoTEntity, new() public async Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
{ {
try var query = BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
var result = new BusPagedResult<T>
{ {
var query =await BuildQuerySQL<T>(options); TotalCount = await GetTotalCount<T>(options),
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); Items = ParseResults<T>(sessionDataSet, options.PageSize)
};
return result;
var result = new BusPagedResult<T>
{
TotalCount = await GetTotalCount<T>(options),
Items = await ParseResults<T>(sessionDataSet, options.PageSize),
PageIndex = options.PageIndex,
PageSize = options.PageSize,
};
result.HasNext = result.TotalCount > 0? result.TotalCount < result.PageSize : false;
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
throw;
}
} }
/// <summary> /// <summary>
@ -256,12 +160,12 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
string tableNameOrTreePath = string.Empty; string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>(); var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
if (tableNameOrTreePathAttribute != null) if (tableNameOrTreePathAttribute != null)
{ {
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
} }
foreach (var entity in entities) foreach (var entity in entities)
{ {
timestamps.Add(entity.Timestamps); timestamps.Add(entity.Timestamps);
@ -310,23 +214,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
else else
{ {
//需要根据value的类型进行相应的值映射转换例如datetime转换为long的时间戳值 rowValues.Add(value);
if (value != null)
{
Type tupleType = value.GetType();
var tempValue = tupleType.Name.ToUpper() switch
{
"DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value
};
rowValues.Add(tempValue);
}
else
{
rowValues.Add(value);
}
} }
} }
@ -349,7 +237,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
devicePaths.Add(DevicePathBuilder.GetTableName<T>()); devicePaths.Add(DevicePathBuilder.GetTableName<T>());
} }
} }
} }
if (devicePaths.Count > 1) if (devicePaths.Count > 1)
@ -370,7 +258,8 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="values">数据集合</param> /// <param name="values">数据集合</param>
/// <param name="timestamps">时间戳集合</param> /// <param name="timestamps">时间戳集合</param>
/// <returns></returns> /// <returns></returns>
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List<List<object>> values, List<long> timestamps) private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath,
List<List<object>> values, List<long> timestamps)
{ {
//todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段只需要保留FIELD类型字段即可 //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段只需要保留FIELD类型字段即可
@ -391,7 +280,8 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="values">数据集合</param> /// <param name="values">数据集合</param>
/// <param name="timestamps">时间戳集合</param> /// <param name="timestamps">时间戳集合</param>
/// <returns></returns> /// <returns></returns>
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List<List<object>> values, List<long> timestamps) private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName,
List<List<object>> values, List<long> timestamps)
{ {
var tablet = new Tablet( var tablet = new Tablet(
tableName, tableName,
@ -411,9 +301,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity private string BuildQuerySQL<T>(QueryOptions options) where T : IoTEntity
{ {
var metadata = await GetMetadata<T>(); var metadata = GetMetadata<T>();
var sb = new StringBuilder("SELECT "); var sb = new StringBuilder("SELECT ");
sb.AppendJoin(", ", metadata.ColumnNames); sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}"); sb.Append($" FROM {options.TableNameOrTreePath}");
@ -424,7 +314,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition));
} }
sb.Append($" LIMIT {options.PageSize} OFFSET {options.PageIndex * options.PageSize}"); sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}");
return sb.ToString(); return sb.ToString();
} }
@ -434,9 +324,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
private async Task<string> BuildDeleteSQL<T>(IoTDBQueryOptions options) where T : IoTEntity private string BuildDeleteSQL<T>(QueryOptions options) where T : IoTEntity
{ {
var metadata = await GetMetadata<T>(); var metadata = GetMetadata<T>();
var sb = new StringBuilder(); var sb = new StringBuilder();
if (!_runtimeContext.UseTableSessionPool) if (!_runtimeContext.UseTableSessionPool)
@ -471,10 +361,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
return condition.Operator switch return condition.Operator switch
{ {
">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}": $"{condition.Field} > '{condition.Value}'", ">" => $"{condition.Field} > {condition.Value}",
"<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'", "<" => $"{condition.Field} < {condition.Value}",
"=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'", "=" => $"{condition.Field} = '{condition.Value}'",
_ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况") _ => throw new NotSupportedException($"Operator {condition.Operator} not supported")
}; };
} }
@ -484,7 +374,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
private async Task<int> GetTotalCount<T>(IoTDBQueryOptions options) where T : IoTEntity private async Task<int> GetTotalCount<T>(QueryOptions options) where T : IoTEntity
{ {
var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}"; var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}";
if (options.Conditions.Any()) if (options.Conditions.Any())
@ -503,10 +393,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="dataSet"></param> /// <param name="dataSet"></param>
/// <param name="pageSize"></param> /// <param name="pageSize"></param>
/// <returns></returns> /// <returns></returns>
private async Task<IEnumerable<T>> ParseResults<T>(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() private IEnumerable<T> ParseResults<T>(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new()
{ {
var results = new List<T>(); var results = new List<T>();
var metadata = await GetMetadata<T>(); var metadata = GetMetadata<T>();
var properties = typeof(T).GetProperties(); var properties = typeof(T).GetProperties();
@ -518,24 +408,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
Timestamps = record.Timestamps Timestamps = record.Timestamps
}; };
foreach (var measurement in metadata.ColumnNames) foreach (var measurement in metadata.ColumnNames)
{ {
int indexOf = metadata.ColumnNames.IndexOf(measurement); var value = record.Values;
var value = record.Values[indexOf];
var prop = properties.FirstOrDefault(p => var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null) if (prop != null)
{ {
if (measurement.EndsWith("time")) typeof(T).GetProperty(measurement)?.SetValue(entity, value);
{
var tempValue = TimestampHelper.ConvertToDateTime(Convert.ToInt64(value), TimestampUnit.Nanoseconds);
typeof(T).GetProperty(measurement)?.SetValue(entity, value);
}
else
{
typeof(T).GetProperty(measurement)?.SetValue(entity, value);
}
} }
} }
@ -545,6 +427,30 @@ namespace JiShe.CollectBus.IoTDB.Provider
return results; return results;
} }
/// <summary>
/// 获取设备元数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
private DeviceMetadata GetMetadata<T>() where T : IoTEntity
{
var columns = CollectColumnMetadata(typeof(T));
var metadata = BuildDeviceMetadata<T>(columns);
return MetadataCache.AddOrUpdate(
typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) =>
{
var columns = CollectColumnMetadata(t);
var metadata = BuildDeviceMetadata<T>(columns);
//对现有值 existingValue 进行修改,返回新值
existingValue.ColumnNames = metadata.ColumnNames;
return existingValue;
}
);
}
/// <summary> /// <summary>
/// 获取设备元数据的列 /// 获取设备元数据的列
/// </summary> /// </summary>
@ -733,7 +639,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
["DOUBLE"] = TSDataType.DOUBLE, ["DOUBLE"] = TSDataType.DOUBLE,
["TEXT"] = TSDataType.TEXT, ["TEXT"] = TSDataType.TEXT,
["NULLTYPE"] = TSDataType.NONE, ["NULLTYPE"] = TSDataType.NONE,
["DATETIME"] = TSDataType.TIMESTAMP, ["TIMESTAMP"] = TSDataType.TIMESTAMP,
["DATE"] = TSDataType.DATE, ["DATE"] = TSDataType.DATE,
["BLOB"] = TSDataType.BLOB, ["BLOB"] = TSDataType.BLOB,
["DECIMAL"] = TSDataType.STRING, ["DECIMAL"] = TSDataType.STRING,
@ -753,7 +659,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
["DOUBLE"] = 0.0d, ["DOUBLE"] = 0.0d,
["TEXT"] = string.Empty, ["TEXT"] = string.Empty,
["NULLTYPE"] = null, ["NULLTYPE"] = null,
["DATETIME"] = null, ["TIMESTAMP"] = null,
["DATE"] = null, ["DATE"] = null,
["BLOB"] = null, ["BLOB"] = null,
["DECIMAL"] = "0.0", ["DECIMAL"] = "0.0",

View File

@ -25,7 +25,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
.SetNodeUrl(options.ClusterList) .SetNodeUrl(options.ClusterList)
.SetUsername(options.UserName) .SetUsername(options.UserName)
.SetPassword(options.Password) .SetPassword(options.Password)
.SetZoneId(options.ZoneId)
.SetFetchSize(options.FetchSize) .SetFetchSize(options.FetchSize)
.SetPoolSize(options.PoolSize) .SetPoolSize(options.PoolSize)
.Build(); .Build();

View File

@ -25,7 +25,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
.SetNodeUrls(options.ClusterList) .SetNodeUrls(options.ClusterList)
.SetUsername(options.UserName) .SetUsername(options.UserName)
.SetPassword(options.Password) .SetPassword(options.Password)
.SetZoneId(options.ZoneId)
.SetFetchSize(options.FetchSize) .SetFetchSize(options.FetchSize)
.SetPoolSize(options.PoolSize) .SetPoolSize(options.PoolSize)
.SetDatabase(options.DataBaseName) .SetDatabase(options.DataBaseName)

View File

@ -57,5 +57,5 @@ public class KafkaOptionConfig
/// <summary> /// <summary>
/// 首次采集时间 /// 首次采集时间
/// </summary> /// </summary>
public DateTime? FirstCollectionTime { get; set; } public DateTime FirstCollectionTime { get; set; }
} }

View File

@ -4,7 +4,6 @@ using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
@ -31,12 +30,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
{ {
private readonly ILogger<SampleAppService> _logger; private readonly ILogger<SampleAppService> _logger;
private readonly IIoTDbProvider _iotDBProvider; private readonly IIoTDbProvider _iotDBProvider;
private readonly IoTDBRuntimeContext _dbContext; private readonly IoTDbRuntimeContext _dbContext;
private readonly IoTDbOptions _options; private readonly IoTDbOptions _options;
private readonly IRedisDataCacheService _redisDataCacheService; private readonly IRedisDataCacheService _redisDataCacheService;
public SampleAppService(IIoTDbProvider iotDBProvider, IOptions<IoTDbOptions> options, public SampleAppService(IIoTDbProvider iotDBProvider, IOptions<IoTDbOptions> options,
IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService) IoTDbRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService)
{ {
_iotDBProvider = iotDBProvider; _iotDBProvider = iotDBProvider;
_options = options.Value; _options = options.Value;
@ -51,20 +50,20 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
/// <param name="testTime"></param> /// <param name="testTime"></param>
/// <returns></returns> /// <returns></returns>
[HttpGet] [HttpGet]
public async Task UseSessionPool(long testTime) public async Task UseSessionPool(DateTime testTime)
{ {
ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel() ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel()
{ {
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506s", DeviceId = "402440506",
DeviceType = "Ammeter", DeviceType = "Ammeter",
Current = 10, Current = 10,
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectId = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
IssuedMessageHexString = "messageHexString", IssuedMessageHexString = "messageHexString",
Timestamps = testTime// DateTimeOffset.UtcNow.ToUnixTimeNanoseconds()//testTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
@ -84,9 +83,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "Ammeter", DeviceType = "Ammeter",
Current = 10, Current = 10,
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectId = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
await _iotDBProvider.InsertAsync(meter2); await _iotDBProvider.InsertAsync(meter2);
@ -100,9 +99,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "Ammeter", DeviceType = "Ammeter",
Current = 10, Current = 10,
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectId = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
@ -123,10 +122,10 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "Ammeter", DeviceType = "Ammeter",
Current = 10, Current = 10,
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectId = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
IssuedMessageHexString = "dsdfsfd", IssuedMessageHexString = "dsdfsfd",
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(), Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
@ -142,17 +141,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
DeviceType = "Ammeter", DeviceType = "Ammeter",
Current = 10, Current = 10,
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectId = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
Currentd = 22, Currentd = 22,
IssuedMessageHexString = "dsdfsfd", IssuedMessageHexString = "dsdfsfd",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
//var dd = DateTimeOffset.Now.ToUnixTimeMilliseconds();
//var dd3 = DateTimeOffset.Now.ToUnixTimeMicroseconds();
//var dd2 = DateTimeOffset.Now.ToUnixTimeNanoseconds();
await _iotDBProvider.InsertAsync(meter3); await _iotDBProvider.InsertAsync(meter3);
} }
@ -169,7 +163,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506", DeviceId = "402440506",
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectCode = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, string>(measuring, value) SingleMeasuring = new Tuple<string, string>(measuring, value)
}; };
@ -189,7 +183,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506", DeviceId = "402440506",
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectCode = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, int>(measuring, value) SingleMeasuring = new Tuple<string, int>(measuring, value)
}; };
@ -209,7 +203,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506", DeviceId = "402440506",
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectCode = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = new Tuple<string, string>(measuring, value) SingleColumn = new Tuple<string, string>(measuring, value)
}; };
@ -230,7 +224,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506", DeviceId = "402440506",
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectCode = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = new Tuple<string, int>(measuring, value) SingleColumn = new Tuple<string, int>(measuring, value)
}; };

View File

@ -1,6 +1,4 @@
using Confluent.Kafka; using DnsClient.Protocol;
using DnsClient.Protocol;
using FreeSql;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.BuildSendDatas;
@ -11,11 +9,7 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
@ -25,8 +19,6 @@ using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using Mapster; using Mapster;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System; using System;
@ -46,22 +38,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IIoTDbProvider _dbProvider; private readonly IIoTDbProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService; private readonly IRedisDataCacheService _redisDataCacheService;
private readonly KafkaOptionConfig _kafkaOptions; private readonly KafkaOptionConfig _kafkaOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger, ILogger<BasicScheduledMeterReadingService> logger,
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService, IProducerService producerService,
IRedisDataCacheService redisDataCacheService, IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IOptions<KafkaOptionConfig> kafkaOptions) IOptions<KafkaOptionConfig> kafkaOptions)
{ {
_logger = logger; _logger = logger;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_runtimeContext = runtimeContext; _meterReadingRecordRepository = meterReadingRecordRepository;
_producerService = producerService; _producerService = producerService;
_redisDataCacheService = redisDataCacheService; _redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
@ -141,23 +133,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
//List<MeterReadingTelemetryPacketInfo> pushTaskInfos = new(); //_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}");
_runtimeContext.UseTableSessionPool = true;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, timestamps) => taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
{ {
var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"{data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_dbProvider.BatchInsertAsync(metadata, tempTask);
}); });
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{ {
@ -166,7 +152,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = CreateMeterPublishTask<WatermeterInfo>( _ = CreateMeterPublishTask<WatermeterInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: tasksToBeIssueModel.NextTaskTime, taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}",
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
{ {
@ -183,7 +169,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime;
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
} }
@ -208,7 +193,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
#if DEBUG #if DEBUG
return;
var timeDensity = "15"; var timeDensity = "15";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
@ -247,7 +236,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop(); timer1.Stop();
_logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); _logger.LogError($"读取数据花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
//return; //return;
#else #else
@ -272,18 +261,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组 //根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
if (_kafkaOptions.FirstCollectionTime.HasValue == false)
{
_kafkaOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存 //先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity) foreach (var item in meterInfoGroupByTimeDensity)
{ {
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
LastTaskTime = null,
TimeDensity = item.Key, TimeDensity = item.Key,
NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
}; };
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。 //todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
@ -411,13 +395,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}; };
var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
//Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
//{ {
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
//}); });
await Task.CompletedTask; await Task.CompletedTask;
@ -442,13 +426,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}; };
var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
//Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
//{ {
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
//}); });
} }
/// <summary> /// <summary>
@ -459,18 +443,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 15; int timeDensity = 15;
//var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity); var currentTime = DateTime.Now;
var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity);
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
// 自动计算最佳并发度 // 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
@ -478,81 +452,73 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
MaxDegreeOfParallelism = recommendedThreads, MaxDegreeOfParallelism = recommendedThreads,
}; };
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
var conditions = new List<QueryCondition>(); Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
conditions.Add(new QueryCondition()
{ {
Field = "PendingCopyReadTime", var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
Operator = "=", var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
IsNumber = true,
Value = pendingCopyReadTime _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
}); });
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = 3000,
Conditions = conditions,
});
} }
///// <summary> /// <summary>
///// 创建电表待发送的任务数据 /// 创建电表待发送的任务数据
///// </summary> /// </summary>
///// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
///// <param name="taskBatch">时间格式的任务批次名称</param> /// <param name="taskBatch">时间格式的任务批次名称</param>
///// <returns></returns> /// <returns></returns>
//private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch)
//{ {
// var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
// //获取对应频率中的所有电表信息 //获取对应频率中的所有电表信息
// var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
// List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
// decimal? cursor = null; decimal? cursor = null;
// string member = null; string member = null;
// bool hasNext; bool hasNext;
// do do
// { {
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000, pageSize: 1000,
// lastScore: cursor, lastScore: cursor,
// lastMember: member); lastMember: member);
// meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null; cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null; member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext; hasNext = page.HasNext;
// } while (hasNext); } while (hasNext);
// if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
// { {
// timer.Stop(); timer.Stop();
// _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
// return; return;
// } }
// await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
// items: meterInfos, items: meterInfos,
// deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
// processor: (data, groupIndex) => processor: (data, groupIndex) =>
// { {
// AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
// } }
// ); );
// timer.Stop(); timer.Stop();
// _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
//} }
/// <summary> /// <summary>
@ -561,33 +527,38 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param> /// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns> /// <returns></returns>
private List<MeterReadingTelemetryPacketInfo> AmmerterCreatePublishTaskAction(int timeDensity private void AmmerterCreatePublishTaskAction(int timeDensity
, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
{ {
var currentTime = DateTime.Now;
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null; return;
} }
//载波的不处理 //载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return null; return;
} }
if (ammeterInfo.State.Equals(2)) if (ammeterInfo.State.Equals(2))
{ {
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return null; return;
} }
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
@ -600,22 +571,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{ {
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return null; return;
} }
if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
return null; return;
} }
if (Convert.ToInt32(ammeterInfo.Address) > 65535) if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
return null; return;
} }
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
return null; return;
} }
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!; List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
@ -642,7 +613,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (tempSubCodes == null || tempSubCodes.Count <= 0) if (tempSubCodes == null || tempSubCodes.Count <= 0)
{ {
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); //_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return null; return;
} }
else else
{ {
@ -712,18 +683,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterReadingRecords = new MeterReadingTelemetryPacketInfo() var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{ {
SystemName = SystemType, ProjectID = ammeterInfo.ProjectID,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.FocusAddress}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = ammeterInfo.DatabaseBusiID, DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = timestamps, PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime, CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress, MeterAddress = ammeterInfo.AmmerterAddress,
AFN = (int)aFN, MeterId = ammeterInfo.MeterId,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
FocusId = ammeterInfo.FocusId,
AFN = aFN,
Fn = fn, Fn = fn,
//Seq = builderResponse.Seq, Seq = builderResponse.Seq,
MSA = builderResponse.MSA, MSA = builderResponse.MSA,
ItemCode = tempItem, ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA), TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA),
@ -738,7 +709,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
} }
return taskList; if (taskList == null
|| taskList.Count() <= 0
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
return;
}
using (var pipe = FreeRedisProvider.Instance.StartPipe())
{
foreach (var item in taskList)
{
// 主数据存储Hash
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize());
// Set索引缓存
pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId);
// ZSET索引缓存Key
pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId);
}
pipe.EndPipe();
}
//await _redisDataCacheService.BatchInsertDataAsync(
// redisCacheTelemetryPacketInfoHashKey,
// redisCacheTelemetryPacketInfoSetIndexKey,
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
// taskList);
} }
#endregion #endregion
@ -863,7 +864,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{ {
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
} }
////删除任务数据 ////删除任务数据
@ -876,52 +877,52 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
} }
///// <summary> /// <summary>
///// 创建水表待发送的任务数据 /// 创建水表待发送的任务数据
///// </summary> /// </summary>
///// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
///// <param name="meterInfo">水表信息</param> /// <param name="meterInfo">水表信息</param>
///// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
///// <param name="taskBatch">时间格式的任务批次名称</param> /// <param name="taskBatch">时间格式的任务批次名称</param>
///// <returns></returns> /// <returns></returns>
//private void WatermeterCreatePublishTaskAction(int timeDensity private void WatermeterCreatePublishTaskAction(int timeDensity
// , WatermeterInfo meterInfo, int groupIndex, string taskBatch) , WatermeterInfo meterInfo, int groupIndex, string taskBatch)
//{ {
// var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
// var currentTime = DateTime.Now; var currentTime = DateTime.Now;
// var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// var taskInfo = new MeterReadingTelemetryPacketInfo() var taskInfo = new MeterReadingTelemetryPacketInfo()
// { {
// Seq= null, Seq= null,
};
//
// }; Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address);
// //
// Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); using (var pipe = FreeRedisProvider.Instance.StartPipe())
{
// 主数据存储Hash
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize());
// using (var pipe = FreeRedisProvider.Instance.StartPipe()) // Set索引缓存
// { pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId);
// // 主数据存储Hash
// pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize());
// // Set索引缓存 // ZSET索引缓存Key
// pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId);
// // ZSET索引缓存Key pipe.EndPipe();
// pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); }
// pipe.EndPipe(); }
// }
//}
#endregion #endregion
@ -960,11 +961,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 创建表的待发送的任务数据 /// 创建表的待发送的任务数据
/// </summary> /// </summary>
/// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
/// <param name="nextTaskTime">采集频率对应的任务时间戳</param> /// <param name="taskBatch">时间格式的任务批次名称</param>
/// <param name="meterType">表类型</param> /// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param> /// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns> /// <returns></returns>
private async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel private async Task CreateMeterPublishTask<T>(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action<int, T, int, string> taskCreateAction) where T : DeviceCacheBasicModel
{ {
var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
@ -977,29 +978,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
decimal? cursor = null; decimal? cursor = null;
string member = null; string member = null;
bool hasNext; bool hasNext;
//do do
//{ {
// var page = await _redisDataCacheService.GetAllPagedData<T>( var page = await _redisDataCacheService.GetAllPagedData<T>(
// redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000, pageSize: 1000,
// lastScore: cursor, lastScore: cursor,
// lastMember: member); lastMember: member);
// meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null; cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null; member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext; hasNext = page.HasNext;
//} while (hasNext); } while (hasNext);
var page = await _redisDataCacheService.GetAllPagedData<T>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 10,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
@ -1008,40 +1000,56 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return; return;
} }
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: meterInfos,
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) => processor: (data, groupIndex) =>
{ {
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime); taskCreateAction(timeDensity, data, groupIndex, taskBatch);
} }
); );
timer.Stop(); timer.Stop();
_logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息"); _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
} }
/// <summary> /// <summary>
/// 创建Kafka消息 /// 创建Kafka消息
/// </summary> /// </summary>
/// <param name="redisCacheTelemetryPacketInfoHashKey"></param>
/// <param name="redisCacheTelemetryPacketInfoZSetScoresIndexKey"></param>
/// <returns></returns> /// <returns></returns>
private async Task CreateMeterKafkaTaskMessage<T>(int timeDensity, IoTDBQueryOptions options) where T : IoTEntity, new() private async Task CreateMeterKafkaTaskMessage(
string redisCacheTelemetryPacketInfoHashKey,
string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
{ {
int pageNumber = 0; if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey))
{
throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败参数异常,-101");
}
decimal? cursor = null;
string member = null;
bool hasNext; bool hasNext;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
do do
{ {
options.PageIndex = pageNumber++; var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
var pageResult = await _dbProvider.QueryAsync<T>(options); cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
hasNext = pageResult.HasNext; await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: page.Items,
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>( deviceIdSelector: data => data.FocusAddress,
items: pageResult.Items.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) => processor: (data, groupIndex) =>
{ {
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
@ -1051,57 +1059,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} while (hasNext); } while (hasNext);
stopwatch.Stop(); stopwatch.Stop();
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
} }
///// <summary>
///// 创建Kafka消息
///// </summary>
///// <param name="redisCacheTelemetryPacketInfoHashKey"></param>
///// <param name="redisCacheTelemetryPacketInfoZSetScoresIndexKey"></param>
///// <returns></returns>
//private async Task CreateMeterKafkaTaskMessage(
//string redisCacheTelemetryPacketInfoHashKey,
//string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
//{
// if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey))
// {
// throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败参数异常,-101");
// }
// decimal? cursor = null;
// string member = null;
// bool hasNext;
// var stopwatch = Stopwatch.StartNew();
// do
// {
// var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
// redisCacheTelemetryPacketInfoHashKey,
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
// await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
// items: page.Items,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, groupIndex) =>
// {
// _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
// }
// );
// } while (hasNext);
// stopwatch.Stop();
// _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
//}
/// <summary> /// <summary>
/// Kafka 推送消息 /// Kafka 推送消息
/// </summary> /// </summary>
@ -1109,15 +1069,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="taskRecord">任务记录</param> /// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param> /// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns> /// <returns></returns>
private async Task KafkaProducerIssuedMessageAction<T>(string topicName, private async Task KafkaProducerIssuedMessageAction(string topicName,
T taskRecord, int partition) where T : class MeterReadingTelemetryPacketInfo taskRecord, int partition)
{ {
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{ {
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
} }
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition); await _producerService.ProduceAsync(topicName, partition, taskRecord);
} }
#endregion #endregion

View File

@ -8,7 +8,6 @@ using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
@ -36,19 +35,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{ {
string serverTagName = string.Empty; string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService( public EnergySystemScheduledMeterReadingService(
ILogger<EnergySystemScheduledMeterReadingService> logger, ILogger<EnergySystemScheduledMeterReadingService> logger,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IOptions<KafkaOptionConfig> kafkaOptions, IMeterReadingRecordRepository meterReadingRecordRepository,
IoTDBRuntimeContext runtimeContext, IOptions<KafkaOptionConfig> kafkaOptions,
IProducerService producerService, IProducerService producerService,
IRedisDataCacheService redisDataCacheService) IRedisDataCacheService redisDataCacheService)
: base(logger, : base(logger,
meterReadingRecordRepository,
producerService, producerService,
redisDataCacheService, redisDataCacheService,
dbProvider, dbProvider,
runtimeContext,
kafkaOptions) kafkaOptions)
{ {
serverTagName = kafkaOptions.Value.ServerTagName; serverTagName = kafkaOptions.Value.ServerTagName;

View File

@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Ammeters
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义 /// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary> /// </summary>
[Column(IsIgnore = true)] [Column(IsIgnore = true)]
public override string MemberId => $"{FocusAddress}:{MeteringCode}"; public override string MemberId => $"{FocusId}:{MeterId}";
/// <summary> /// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳 /// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳

View File

@ -1,9 +1,5 @@
using JiShe.CollectBus.Common.Encrypt; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Model;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -17,79 +13,78 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// <summary> /// <summary>
/// 抄读任务Redis缓存数据记录 /// 抄读任务Redis缓存数据记录
/// </summary> /// </summary>
[EntityType(EntityTypeEnum.TableModel)] public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel
public class MeterReadingTelemetryPacketInfo : IoTEntity
{ {
/// <summary> /// <summary>
/// 排序索引分数值,具体值可以根据不同业务场景进行定义例如时间戳、或者某一个固定的标识1 /// 关系映射标识用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
/// </summary> /// </summary>
[FIELDColumn] public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}";
public string ScoreValue
{ /// <summary>
get /// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
{ /// </summary>
return $"{DeviceId}.{TaskMark}".Md5Fun(); public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
}
}
/// <summary> /// <summary>
/// 是否手动操作 /// 是否手动操作
/// </summary> /// </summary>
[FIELDColumn]
public bool ManualOrNot { get; set; } public bool ManualOrNot { get; set; }
/// <summary> /// <summary>
/// 任务数据唯一标记 /// 任务数据唯一标记
/// </summary> /// </summary>
[FIELDColumn] public decimal TaskMark { get; set; }
public string TaskMark { get; set; }
/// <summary>
/// 时间戳标记IoTDB时间列处理上报通过构建标记获取唯一标记匹配时间戳。
/// </summary>
public long Timestamps { get; set; }
/// <summary> /// <summary>
/// 是否超时 /// 是否超时
/// </summary> /// </summary>
[FIELDColumn]
public bool IsTimeout { get; set; } = false; public bool IsTimeout { get; set; } = false;
/// <summary> /// <summary>
/// 待抄读时间 /// 待抄读时间
/// </summary> /// </summary>
[FIELDColumn]
public DateTime PendingCopyReadTime { get; set; } public DateTime PendingCopyReadTime { get; set; }
/// <summary> /// <summary>
/// 集中器Id /// 集中器地址
/// </summary> /// </summary>
[FIELDColumn] public string FocusAddress { get; set; }
public int FocusId { get; set; }
/// <summary>
/// 表Id
/// </summary>
[FIELDColumn]
public int MeterId { get; set; }
/// <summary> /// <summary>
/// 表地址 /// 表地址
/// </summary> /// </summary>
[FIELDColumn]
public string MeterAddress { get; set; } public string MeterAddress { get; set; }
/// <summary>
/// 表类型
/// </summary>
public MeterTypeEnum MeterType { get; set; }
/// <summary>
/// 项目ID
/// </summary>
public int ProjectID { get; set; }
/// <summary> /// <summary>
/// 数据库业务ID /// 数据库业务ID
/// </summary> /// </summary>
[FIELDColumn]
public int DatabaseBusiID { get; set; } public int DatabaseBusiID { get; set; }
/// <summary> /// <summary>
/// AFN功能码 /// AFN功能码
/// </summary> /// </summary>
[FIELDColumn] public AFN AFN { get; set; }
public int AFN { get; set; }
/// <summary> /// <summary>
/// 抄读功能码 /// 抄读功能码
/// </summary> /// </summary>
[FIELDColumn]
public int Fn { get; set; } public int Fn { get; set; }
/// <summary> /// <summary>
@ -100,73 +95,66 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// <summary> /// <summary>
/// 采集项编码 /// 采集项编码
/// </summary> /// </summary>
[FIELDColumn] public string ItemCode { get; set;}
public string ItemCode { get; set; }
///// <summary> /// <summary>
///// 帧序列域SEQ /// 帧序列域SEQ
///// </summary> /// </summary>
//public required Seq Seq { get; set; } public required Seq Seq { get; set; }
/// <summary> /// <summary>
/// 地址域A3的主站地址MSA /// 地址域A3的主站地址MSA
/// </summary> /// </summary>
[FIELDColumn]
public int MSA { get; set; } public int MSA { get; set; }
/// <summary> /// <summary>
/// 是否发送 /// 是否发送
/// </summary> /// </summary>
[FIELDColumn]
public bool IsSend { get; set; } public bool IsSend { get; set; }
/// <summary> /// <summary>
/// 创建时间 /// 创建时间
/// </summary> /// </summary>
[FIELDColumn]
public DateTime CreationTime { get; set; } public DateTime CreationTime { get; set; }
/// <summary> /// <summary>
/// 下发消息内容 /// 下发消息内容
/// </summary> /// </summary>
[FIELDColumn]
public string IssuedMessageHexString { get; set; } public string IssuedMessageHexString { get; set; }
/// <summary> /// <summary>
/// 下发消息Id /// 下发消息Id
/// </summary> /// </summary>
[FIELDColumn]
public string IssuedMessageId { get; set; } public string IssuedMessageId { get; set; }
/// <summary> /// <summary>
/// 消息上报内容 /// 消息上报内容
/// </summary> /// </summary>
[FIELDColumn]
public string? ReceivedMessageHexString { get; set; } public string? ReceivedMessageHexString { get; set; }
/// <summary> /// <summary>
/// 消息上报时间 /// 消息上报时间
/// </summary> /// </summary>
[FIELDColumn]
public DateTime? ReceivedTime { get; set; } public DateTime? ReceivedTime { get; set; }
/// <summary> /// <summary>
/// 上报消息Id /// 上报消息Id
/// </summary> /// </summary>
[FIELDColumn] public string ReceivedMessageId { get; set; }
public string ReceivedMessageId { get; set; }
/// <summary> /// <summary>
/// 上报报文解析备注,异常情况下才有 /// 上报报文解析备注,异常情况下才有
/// </summary> /// </summary>
[FIELDColumn]
public string ReceivedRemark { get; set; } public string ReceivedRemark { get; set; }
/// <summary> /// <summary>
/// 是否已上报 /// 是否已上报
/// </summary> /// </summary>
[FIELDColumn] public bool IsReceived { get; set; }
public bool IsReceived { get; set; }
//public void CreateDataId(Guid Id)
//{
// this.Id = Id;
//}
} }
} }

View File

@ -11,11 +11,6 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
/// </summary> /// </summary>
public class TasksToBeIssueModel public class TasksToBeIssueModel
{ {
/// <summary>
/// 上次下发任务的时间
/// </summary>
public DateTime? LastTaskTime { get; set; }
/// <summary> /// <summary>
/// 下个任务时间 /// 下个任务时间
/// </summary> /// </summary>

View File

@ -1,65 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Encrypt
{
/// <summary>
/// 各种加密辅助类
/// </summary>
public static class EncryptUtil
{
#region MD5加密
/// <summary>
/// MD5加密
/// </summary>
public static string Md5Fun(this string value)
{
if (value == null)
{
throw new ArgumentNullException("未将对象引用设置到对象的实例。");
}
var encoding = Encoding.UTF8;
MD5 md5 = MD5.Create();
return HashAlgorithmBase(md5, value, encoding);
}
/// <summary>
/// 加权MD5加密
/// </summary>
public static string Md5Fun(this string value, string salt)
{
return salt == null ? value.Md5Fun() : (value + "『" + salt + "』").Md5Fun();
}
#endregion
/// <summary>
/// HashAlgorithm 加密统一方法
/// </summary>
private static string HashAlgorithmBase(HashAlgorithm hashAlgorithmObj, string source, Encoding encoding)
{
byte[] btStr = encoding.GetBytes(source);
byte[] hashStr = hashAlgorithmObj.ComputeHash(btStr);
return hashStr.Bytes2Str();
}
/// <summary>
/// 转换成字符串
/// </summary>
private static string Bytes2Str(this IEnumerable<byte> source, string formatStr = "{0:X2}")
{
StringBuilder pwd = new StringBuilder();
foreach (byte btStr in source)
{
pwd.AppendFormat(formatStr, btStr);
}
return pwd.ToString();
}
}
}

View File

@ -1,16 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Enums
{
public enum TimestampUnit
{
Seconds, // 秒级Unix 时间戳)
Milliseconds, // 毫秒级(默认)
Microseconds, // 微秒级
Nanoseconds // 纳秒级
}
}

View File

@ -181,7 +181,25 @@ namespace JiShe.CollectBus.Common.Extensions
return $"{dateTime:yyyyMMddHH}"; return $"{dateTime:yyyyMMddHH}";
#endif #endif
} }
/// <summary>
/// 获取当前时间毫秒级时间戳
/// </summary>
/// <returns></returns>
public static long GetCurrentTimeMillis()
{
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
/// <summary>
/// 将Unix时间戳转换为日期时间
/// </summary>
/// <param name="millis"></param>
/// <returns></returns>
public static DateTime FromUnixMillis(long millis)
{
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
}
/// <summary> /// <summary>
/// 采集时间节点计算 /// 采集时间节点计算
@ -218,40 +236,19 @@ namespace JiShe.CollectBus.Common.Extensions
/// <summary> /// <summary>
/// 格式化为微秒μs /// 将 DateTime 时间转换为 DateTimeOffset 时间
/// </summary> /// </summary>
/// <param name="dt"></param> /// <param name="rawDateTime"></param>
/// <returns></returns> /// <returns></returns>
public static string ToMicrosecondString(this DateTime dt) public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime)
{ {
long microseconds = (dt.Ticks % TimeSpan.TicksPerSecond) / 10; // 1 Tick = 100ns → 0.1μs //确保 Kind 为 Local如果是 Unspecified
return $"{dt:yyyy-MM-dd HH:mm:ss.fffffff}".Remove(23) + $"{microseconds:D6}"; DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified
} ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local)
: rawDateTime;
/// <summary> // 转换为 DateTimeOffset自动应用本地时区偏移
/// 格式化为纳秒ns return new DateTimeOffset(localDateTime);
/// </summary>
/// <param name="dt"></param>
/// <returns></returns>
public static string ToNanosecondString(this DateTime dt)
{
long nanoseconds = (dt.Ticks % TimeSpan.TicksPerSecond) * 100; // 1 Tick = 100ns
return $"{dt:yyyy-MM-dd HH:mm:ss.fffffff}".Remove(23) + $"{nanoseconds:D9}";
}
/// <summary>
/// 毫米、微秒、纳秒时间戳转DateTime
/// </summary>
/// <param name="dateLong"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public static DateTime ParseIntToDate(long dateLong)
{
if (dateLong < 10000101 || dateLong > 99991231)
{
throw new ArgumentException("Date must be between 10000101 and 99991231.");
}
return DateTime.TryParseExact(dateLong.ToString(), "yyyyMMdd HHmmssZZ", null, System.Globalization.DateTimeStyles.None, out DateTime date) ? date : throw new ArgumentException("Date must be between 10000101 and 99991231.");
} }
} }
} }

View File

@ -1,46 +0,0 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
namespace JiShe.CollectBus.Common.Extensions
{
public static class DateTimeOffsetExtensions
{
/// <summary>
/// 获取当前时间毫秒级时间戳
/// </summary>
/// <returns></returns>
public static long GetCurrentTimeMillis()
{
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
/// <summary>
/// 将Unix时间戳转换为日期时间
/// </summary>
/// <param name="millis"></param>
/// <returns></returns>
public static DateTime FromUnixMillis(long millis)
{
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
}
/// <summary>
/// 将 DateTime 时间转换为 DateTimeOffset 时间
/// </summary>
/// <param name="rawDateTime"></param>
/// <returns></returns>
public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime)
{
//确保 Kind 为 Local如果是 Unspecified
DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local)
: rawDateTime;
// 转换为 DateTimeOffset自动应用本地时区偏移
return new DateTimeOffset(localDateTime);
}
}
}

View File

@ -769,11 +769,11 @@ namespace JiShe.CollectBus.Common.Helpers
/// <param name="pn"></param> /// <param name="pn"></param>
/// <param name="msa"></param> /// <param name="msa"></param>
/// <returns></returns> /// <returns></returns>
public static string GetTaskMark(int afn, int fn, int pn, int msa) public static decimal GetTaskMark(int afn, int fn, int pn, int msa)
{ {
var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}"; var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}";
return makstr;// Convert.ToInt32(makstr) << 32 | msa; return Convert.ToInt32(makstr) << 32 | msa;
} }
} }
} }

View File

@ -1,68 +0,0 @@
using JiShe.CollectBus.Common.Enums;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Helpers
{
/// <summary>
/// 时间戳帮助类
/// </summary>
public static class TimestampHelper
{
private static readonly long UnixEpochTicks = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero).Ticks;
/// <summary>
/// 获取当前 DateTimeOffset 距离 Unix 纪元1970-01-01的微秒数
/// </summary>
public static long ToUnixTimeMicroseconds(this DateTimeOffset dateTimeOffset)
{
// Ticks 单位是 100 纳秒,转换为微秒需除以 10
long elapsedTicks = dateTimeOffset.Ticks - UnixEpochTicks;
return elapsedTicks / 10; // 1 微秒 = 1000 纳秒 = 10 Ticks
}
/// <summary>
/// 获取当前 DateTimeOffset 距离 Unix 纪元1970-01-01的纳秒数
/// </summary>
public static long ToUnixTimeNanoseconds(this DateTimeOffset dateTimeOffset)
{
long nanoseconds = (dateTimeOffset.Ticks - UnixEpochTicks) * 100;
return nanoseconds;
}
/// <summary>
/// 将 long 类型时间戳转换为 DateTimeUTC
/// </summary>
/// <param name="timestamp">时间戳数值</param>
/// <param name="unit">时间戳单位</param>
public static DateTime ConvertToDateTime(long timestamp, TimestampUnit unit = TimestampUnit.Milliseconds)
{
long ticks = unit switch
{
TimestampUnit.Seconds => checked(timestamp * TimeSpan.TicksPerSecond),
TimestampUnit.Milliseconds => checked(timestamp * TimeSpan.TicksPerMillisecond),
TimestampUnit.Microseconds => checked(timestamp * 10), // 1微秒 = 10 Ticks100纳秒
TimestampUnit.Nanoseconds => checked(timestamp / 100),// 1 Tick = 100纳秒
_ => throw new ArgumentException("无效的时间单位", nameof(unit))
};
try
{
DateTime result = new DateTime(UnixEpochTicks + ticks, DateTimeKind.Utc);
// 校验结果是否在 DateTime 合法范围内0001-01-01 至 9999-12-31
if (result < DateTime.MinValue || result > DateTime.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(timestamp), "时间戳超出 DateTime 范围");
}
return result;
}
catch (ArgumentOutOfRangeException ex)
{
throw new ArgumentOutOfRangeException("时间戳无效", ex);
}
}
}
}

View File

@ -18,7 +18,7 @@
<title>后端服务</title> <title>后端服务</title>
</head> </head>
<body> <body>
<div class="container projects"> <div class="container projects">
<div class="projects-header page-header"> <div class="projects-header page-header">

View File

@ -84,8 +84,7 @@
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus4", "ServerTagName": "JiSheCollectBus99"
"FirstCollectionTime": "2025-04-22 16:07:00"
}, },
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
@ -93,7 +92,7 @@
"ClusterList": [ "192.168.1.9:6667" ], "ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 2, "PoolSize": 2,
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": false, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"Cassandra": { "Cassandra": {