合并代码

This commit is contained in:
ChenYi 2025-04-23 09:45:21 +08:00
commit 9c6e8c0de8
13 changed files with 1142 additions and 425 deletions

View File

@ -13,6 +13,12 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <returns></returns> /// <returns></returns>
Task OpenAsync(); Task OpenAsync();
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
Task CloseAsync();
/// <summary> /// <summary>
/// 插入数据 /// 插入数据
/// </summary> /// </summary>

View File

@ -47,5 +47,10 @@
/// 时区,默认为:"UTC+08:00" /// 时区,默认为:"UTC+08:00"
/// </summary> /// </summary>
public string ZoneId { get; set; } = "UTC+08:00"; public string ZoneId { get; set; } = "UTC+08:00";
/// <summary>
/// 请求超时时间单位毫秒默认为50000
/// </summary>
public long Timeout { get; set; } = 50000;
} }
} }

View File

@ -1,6 +1,8 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.ComponentModel.DataAnnotations;
using System.Reflection; using System.Reflection;
using System.Reflection.Metadata.Ecma335;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Apache.IoTDB; using Apache.IoTDB;
@ -23,7 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary> /// <summary>
/// IoTDB数据源 /// IoTDB数据源
/// </summary> /// </summary>
public class IoTDbProvider : IIoTDbProvider, IScopedDependency public class IoTDbProvider : IIoTDbProvider, ITransientDependency
{ {
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new(); private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
private readonly ILogger<IoTDbProvider> _logger; private readonly ILogger<IoTDbProvider> _logger;
@ -195,7 +197,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
try try
{ {
var query =await BuildQuerySQL<T>(options); var query = await BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
@ -208,12 +210,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
}; };
result.HasNext = result.TotalCount > 0? result.TotalCount < result.PageSize : false; result.HasNext = result.TotalCount > 0 ? result.TotalCount < result.PageSize : false;
return result; return result;
} }
catch (Exception ex) catch (Exception ex)
{ {
CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常"); _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
throw; throw;
} }
@ -414,7 +417,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity
{ {
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var sb = new StringBuilder("SELECT "); var sb = new StringBuilder("SELECT TIME as Timestamps,");
sb.AppendJoin(", ", metadata.ColumnNames); sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}"); sb.Append($" FROM {options.TableNameOrTreePath}");
@ -471,7 +474,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
return condition.Operator switch return condition.Operator switch
{ {
">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}": $"{condition.Field} > '{condition.Value}'", ">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}" : $"{condition.Field} > '{condition.Value}'",
"<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'", "<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'",
"=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'", "=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'",
_ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况") _ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况")
@ -493,7 +496,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; if (result.HasNext())
{
await result.Close();
return 0;
}
var count = Convert.ToInt32(result.Next().Values[0]);
await result.Close();
return count;
} }
/// <summary> /// <summary>
@ -510,6 +522,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
var properties = typeof(T).GetProperties(); var properties = typeof(T).GetProperties();
var columns = new List<string>() { "Timestamps" };
var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP };
columns.AddRange(metadata.ColumnNames);
dataTypes.AddRange(metadata.DataTypes);
//metadata.ColumnNames.Insert(0, "Timestamps");
//metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP);
while (dataSet.HasNext() && results.Count < pageSize) while (dataSet.HasNext() && results.Count < pageSize)
{ {
var record = dataSet.Next(); var record = dataSet.Next();
@ -518,30 +537,34 @@ namespace JiShe.CollectBus.IoTDB.Provider
Timestamps = record.Timestamps Timestamps = record.Timestamps
}; };
foreach (var measurement in metadata.ColumnNames) foreach (var measurement in columns)
{ {
int indexOf = metadata.ColumnNames.IndexOf(measurement); int indexOf = columns.IndexOf(measurement);
var value = record.Values[indexOf]; var value = record.Values[indexOf];
TSDataType tSDataType = dataTypes[indexOf];
var prop = properties.FirstOrDefault(p => var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null) if (prop != null && !(value is System.DBNull))
{ {
if (measurement.EndsWith("time")) dynamic tempValue = GetTSDataValue(tSDataType, value);
if (measurement.ToLower().EndsWith("time"))
{ {
var tempValue = TimestampHelper.ConvertToDateTime(Convert.ToInt64(value), TimestampUnit.Nanoseconds); typeof(T).GetProperty(measurement)?.SetValue(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds));
typeof(T).GetProperty(measurement)?.SetValue(entity, value);
} }
else else
{ {
typeof(T).GetProperty(measurement)?.SetValue(entity, value); typeof(T).GetProperty(measurement)?.SetValue(entity, tempValue);
} }
} }
} }
results.Add(entity); results.Add(entity);
} }
await dataSet.Close();
return results; return results;
} }
@ -759,5 +782,28 @@ namespace JiShe.CollectBus.IoTDB.Provider
["DECIMAL"] = "0.0", ["DECIMAL"] = "0.0",
["STRING"] = string.Empty ["STRING"] = string.Empty
}; };
/// <summary>
/// IoTDB 数据类型与.net类型映射
/// </summary>
/// <param name="tSDataType"></param>
/// <param name="value"></param>
/// <returns></returns>
private dynamic GetTSDataValue(TSDataType tSDataType, object value) =>
tSDataType switch
{
TSDataType.BOOLEAN => Convert.ToBoolean(value),
TSDataType.INT32 => Convert.ToInt32(value),
TSDataType.INT64 => Convert.ToInt64(value),
TSDataType.FLOAT => Convert.ToDouble(value),
TSDataType.DOUBLE => Convert.ToDouble(value),
TSDataType.TEXT => Convert.ToString(value),
TSDataType.NONE => null,
TSDataType.TIMESTAMP => Convert.ToInt64(value),
TSDataType.DATE => Convert.ToDateTime(value),
TSDataType.BLOB => Convert.ToByte(value),
TSDataType.STRING => Convert.ToString(value),
_ => Convert.ToString(value)
};
} }
} }

View File

@ -47,6 +47,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
} }
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (_sessionPool == null)
{
return;
}
await _sessionPool.Close();
}
/// <summary> /// <summary>
/// 批量插入对齐时间序列数据 /// 批量插入对齐时间序列数据
/// </summary> /// </summary>
@ -59,7 +72,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}"); throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}");
} }
//await CloseAsync();
return result; return result;
} }
@ -70,7 +83,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{ {
return await _sessionPool.ExecuteQueryStatementAsync(sql); var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
} }
public void Dispose() public void Dispose()

View File

@ -45,6 +45,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
} }
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (_sessionPool == null)
{
return;
}
await _sessionPool.Close();
}
/// <summary> /// <summary>
/// 批量插入 /// 批量插入
/// </summary> /// </summary>
@ -58,6 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}"); throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}");
} }
//await CloseAsync();
return result; return result;
} }
@ -68,7 +82,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{ {
return await _sessionPool.ExecuteQueryStatementAsync(sql); var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
} }
public void Dispose() public void Dispose()

View File

@ -1,4 +1,5 @@
using Confluent.Kafka; using Amazon.Runtime.Internal.Endpoints.StandardLibrary;
using Confluent.Kafka;
using DnsClient.Protocol; using DnsClient.Protocol;
using FreeSql; using FreeSql;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
@ -6,10 +7,12 @@ using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
@ -32,10 +35,13 @@ using Microsoft.Extensions.Options;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter; using static FreeSql.Internal.GlobalFilter;
using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata;
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -65,6 +71,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService; _producerService = producerService;
_redisDataCacheService = redisDataCacheService; _redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
_runtimeContext.UseTableSessionPool = true;
} }
/// <summary> /// <summary>
@ -111,7 +119,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
foreach (var item in taskInfos) foreach (var item in taskInfos)
{ {
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item); var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
@ -121,7 +128,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率 //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>ServerTagNametempArryay[3]=>TaskInfotempArryay[4]=>表计类别tempArryay[5]=>采集频率
var tempArryay = item.Split(":"); var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别 string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率 int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
@ -139,21 +146,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>(); var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候NextTaskTime已经格式化到下一个采集点时间。
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
//List<MeterReadingTelemetryPacketInfo> pushTaskInfos = new();
_runtimeContext.UseTableSessionPool = true;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, timestamps) => taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0) if (tempTask == null || tempTask.Count <= 0)
{ {
_logger.LogWarning($"{data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_dbProvider.BatchInsertAsync(metadata, tempTask); _dbProvider.BatchInsertAsync(metadata, tempTask);
@ -161,16 +168,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{ {
//todo 水表任务创建待处理
//await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos);
_ = CreateMeterPublishTask<WatermeterInfo>( _ = CreateMeterPublishTask<WatermeterInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: tasksToBeIssueModel.NextTaskTime, nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{ {
//AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_dbProvider.BatchInsertAsync(metadata, tempTask);
}); });
} }
else else
@ -183,8 +195,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime; tasksToBeIssueModel.LastTaskTime = currentTaskTime;
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
} }
} }
@ -208,52 +220,53 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
#if DEBUG //此处代码不要删除
var timeDensity = "15"; //#if DEBUG
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; // var timeDensity = "15";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); // List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
List<string> focusAddressDataLista = new List<string>(); // List<string> focusAddressDataLista = new List<string>();
var timer1 = Stopwatch.StartNew(); // var timer1 = Stopwatch.StartNew();
var allIds = new HashSet<string>(); // var allIds = new HashSet<string>();
decimal? score = null; // decimal? score = null;
string member = null; // string member = null;
while (true) // while (true)
{ // {
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( // var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp, // redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp, // redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000, // pageSize: 1000,
lastScore: score, // lastScore: score,
lastMember: member); // lastMember: member);
meterInfos.AddRange(page.Items); // meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress)); // focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
foreach (var item in page.Items) // foreach (var item in page.Items)
{ // {
if (!allIds.Add(item.MemberId)) // if (!allIds.Add(item.MemberId))
{ // {
_logger.LogError($"{item.MemberId}Duplicate data found!"); // _logger.LogError($"{item.MemberId}Duplicate data found!");
} // }
} // }
if (!page.HasNext) break; // if (!page.HasNext) break;
score = page.NextScore; // score = page.NextScore;
member = page.NextMember; // member = page.NextMember;
} // }
timer1.Stop(); // timer1.Stop();
_logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); // _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); // DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
return; // return;
#else //#else
// var meterInfos = await GetAmmeterInfoList(gatherCode);
//#endif
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
@ -310,17 +323,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
focusAddressDataList.Add(item.Key); focusAddressDataList.Add(item.Key);
// var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#endif
//Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>();
foreach (var ammeter in item) foreach (var ammeter in item)
{ {
//处理ItemCode //处理ItemCode
@ -397,29 +399,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading() public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{ {
//获取缓存中的电表信息
int timeDensity = 5; int timeDensity = 5;
var currentTime = DateTime.Now; var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
// 自动计算最佳并发度 if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
var options = new ParallelOptions
{ {
MaxDegreeOfParallelism = recommendedThreads, _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
}; return;
var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; }
//Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
//{
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); var conditions = new List<QueryCondition>();
//}); conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
await Task.CompletedTask; _ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = 3000,
Conditions = conditions,
});
} }
@ -429,40 +436,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task AmmeterScheduledMeterFiveMinuteReading() public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
{ {
//获取缓存中的电表信息
int timeDensity = 5; int timeDensity = 5;
var currentTime = DateTime.Now; var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
var options = new ParallelOptions
{
MaxDegreeOfParallelism = recommendedThreads,
};
var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
//Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
//{
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
// _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
//});
}
/// <summary>
/// 15分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{
//获取缓存中的电表信息
int timeDensity = 15;
//var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity);
var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity);
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey); var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
@ -471,13 +446,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return; return;
} }
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
var options = new ParallelOptions
{
MaxDegreeOfParallelism = recommendedThreads,
};
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>(); var conditions = new List<QueryCondition>();
@ -498,62 +466,41 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}); });
} }
/// <summary>
/// 15分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{
int timeDensity = 15;
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
///// <summary> if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
///// 创建电表待发送的任务数据 {
///// </summary> _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
///// <param name="timeDensity">采集频率</param> return;
///// <param name="taskBatch">时间格式的任务批次名称</param> }
///// <returns></returns>
//private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch)
//{
// var timer = Stopwatch.StartNew();
// //获取对应频率中的所有电表信息 var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
// var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
// List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); var conditions = new List<QueryCondition>();
// decimal? cursor = null; conditions.Add(new QueryCondition()
// string member = null; {
// bool hasNext; Field = "PendingCopyReadTime",
// do Operator = "=",
// { IsNumber = true,
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( Value = pendingCopyReadTime
// redisCacheMeterInfoHashKeyTemp, });
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
// } while (hasNext);
// if (meterInfos == null || meterInfos.Count <= 0)
// {
// timer.Stop();
// _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
// return;
// }
// await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
// items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, groupIndex) =>
// {
// AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
// }
// );
// timer.Stop();
// _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
//}
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = 3000,
Conditions = conditions,
});
}
/// <summary> /// <summary>
/// 创建电表待发送的任务数据 /// 创建电表待发送的任务数据
@ -563,8 +510,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
private List<MeterReadingTelemetryPacketInfo> AmmerterCreatePublishTaskAction(int timeDensity private List<MeterReadingTelemetryPacketInfo> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
@ -573,20 +519,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null; return null;
} }
//载波的不处理 //载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return null; return null;
} }
if (ammeterInfo.State.Equals(2)) if (ammeterInfo.State.Equals(2))
{ {
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); _logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return null; return null;
} }
@ -599,22 +545,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{ {
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
return null; return null;
} }
if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
return null; return null;
} }
if (Convert.ToInt32(ammeterInfo.Address) > 65535) if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
return null; return null;
} }
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
return null; return null;
} }
@ -709,7 +655,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo() var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{ {
SystemName = SystemType, SystemName = SystemType,
@ -726,13 +672,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//Seq = builderResponse.Seq, //Seq = builderResponse.Seq,
MSA = builderResponse.MSA, MSA = builderResponse.MSA,
ItemCode = tempItem, ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA), TaskMark = taskMark,
IsSend = false, IsSend = false,
ManualOrNot = false, ManualOrNot = false,
Pn = ammeterInfo.MeteringCode, Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false, IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
}; };
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
@ -776,10 +723,35 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空"); throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
} }
List<string> focusAddressDataList = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组 //根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
if (_kafkaOptions.FirstCollectionTime.HasValue == false)
{
_kafkaOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = null,
TimeDensity = item.Key,
NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{ {
List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>();
//将表计信息根据集中器分组,获得集中器号 //将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup) foreach (var item in meterInfoGroup)
@ -789,25 +761,33 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; focusAddressDataList.Add(item.Key);
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
foreach (var subItem in item) foreach (var subItem in item)
{ {
watermeterInfo.Add(subItem);
keyValuePairs.TryAdd($"{subItem.MeterId}", subItem);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
} }
}
//在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行 //初始化设备组负载控制
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
{ {
TimeDensity = itemTimeDensity.Key, _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
NextTaskTime = DateTime.Now.AddMinutes(1)
};
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); }
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); else
{
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions);
} }
@ -821,107 +801,186 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task WatermeterScheduledMeterAutoReading() public virtual async Task WatermeterScheduledMeterAutoReading()
{ {
//获取缓存中的水表信息 //获取缓存中的水表信息
int timeDensity = 1; int timeDensity = 60;//水表目前只有一个采集频率 60分钟
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{ {
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return; return;
} }
//获取下发任务缓存数据 var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{ {
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); Field = "PendingCopyReadTime",
return; Operator = "=",
} IsNumber = true,
Value = pendingCopyReadTime
});
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>(); _ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{ {
foreach (var ammerterItem in focusItem.Value) TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
{ PageIndex = 1,
var tempMsg = new ScheduledMeterReadingIssuedEventMessage() PageSize = 3000,
{ Conditions = conditions,
MessageHexString = ammerterItem.Value.IssuedMessageHexString, });
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
} }
///// <summary> /// <summary>
///// 创建水表待发送的任务数据 /// 创建水表待发送的任务数据
///// </summary> /// </summary>
///// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
///// <param name="meterInfo">水表信息</param> /// <param name="watermeter">水表信息</param>
///// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
///// <param name="taskBatch">时间格式的任务批次名称</param> /// <param name="timestamps">时间格式的任务批次名称</param>
///// <returns></returns> /// <returns></returns>
//private void WatermeterCreatePublishTaskAction(int timeDensity private List<MeterReadingTelemetryPacketInfo> WatermeterCreatePublishTaskAction(int timeDensity
// , WatermeterInfo meterInfo, int groupIndex, string taskBatch) , WatermeterInfo watermeter, int groupIndex, DateTime timestamps)
//{ {
// var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
var currentTime = DateTime.Now;
string typeName;
if (watermeter.MeterType == MeterTypeEnum.WaterMeter)
{
timeDensity = watermeter.TimeDensity;//水表默认为60分钟
typeName = watermeter.LinkType;
if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔"))
{
typeName = watermeter.MeterBrand;
}
}
else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
{
typeName = watermeter.MeterBrand;
}
else
{
_logger.LogError($"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}");
return null;
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
// var currentTime = DateTime.Now; List<string> tempCodes = new List<string>() { "10_1" };
// var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; //todo 后续从协议池获取
// var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; if (watermeter.MeterTypeName.Equals("水表") && (watermeter.Protocol.Equals((int)MeterLinkProtocol.CJT_188_2018) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_1997) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_2007)))//水表且CJT_188_2018或DLT_645_1997都采用0C_129
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; {
if (watermeter.MeterBrand.Contains("炬华有线"))
{
tempCodes = new List<string>() { "0C_188" };
}
else
{
tempCodes = new List<string>() { "0C_129" };
}
}
else if (typeName.Trim().Equals("西恩超声波流量计"))
{
tempCodes = new List<string>() { "10_1" };
}
else if (typeName.Trim().Equals("江苏华海涡街流量计积算仪"))
{
tempCodes = new List<string>() { "10_1" };
}
else if (typeName.Trim().Equals("V880BR涡街流量计"))
{
tempCodes = new List<string>() { "10_1" };
}
else if (typeName.Trim().Equals("拓思特涡街流量计H880BR"))
{
tempCodes = new List<string>() { "10_1" };
}
// var taskInfo = new MeterReadingTelemetryPacketInfo() foreach (var tempItem in tempCodes)
// { {
// Seq= null, //排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
// }; if (MonthFreezeCodes.Contains(tempItem))
// // {
continue;
}
// Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
TelemetryPacketResponse builderResponse = null;
// using (var pipe = FreeRedisProvider.Instance.StartPipe()) string methonCode = $"AFN{aFNStr}_Fn_Send";
// { //特殊表暂不处理
// // 主数据存储Hash if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
// pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); , out var handler))
{
builderResponse = handler(new TelemetryPacketRequest()
{
FocusAddress = watermeter.FocusAddress,
Fn = fn,
Pn = watermeter.MeteringCode,
DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address),
});
}
else
{
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}无效编码。");
continue;
}
// // Set索引缓存 if (builderResponse == null || builderResponse.Data.Length <= 0)
// pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); {
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
// // ZSET索引缓存Key
// pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId);
// pipe.EndPipe(); string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, watermeter.MeteringCode, builderResponse.MSA);
// } var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{watermeter.FocusAddress}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
AFN = (int)aFN,
Fn = fn,
//Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = watermeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
};
//} taskList.Add(meterReadingRecords);
}
return taskList;
}
#endregion #endregion
@ -944,18 +1003,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return false; return false;
} }
/// <summary>
/// 获取缓存表计下发指令缓存key前缀
/// </summary>
/// <param name="timeDensity"></param>
/// <param name="meterType"></param>
/// <returns></returns>
private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType)
{
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*";
}
/// <summary> /// <summary>
/// 创建表的待发送的任务数据 /// 创建表的待发送的任务数据
/// </summary> /// </summary>
@ -977,34 +1024,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading
decimal? cursor = null; decimal? cursor = null;
string member = null; string member = null;
bool hasNext; bool hasNext;
//do do
//{ {
// var page = await _redisDataCacheService.GetAllPagedData<T>( var page = await _redisDataCacheService.GetAllPagedData<T>(
// redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000, pageSize: 1000,
// lastScore: cursor, lastScore: cursor,
// lastMember: member); lastMember: member);
// meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null; cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null; member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext; hasNext = page.HasNext;
//} while (hasNext); } while (hasNext);
var page = await _redisDataCacheService.GetAllPagedData<T>( //var page = await _redisDataCacheService.GetAllPagedData<T>(
redisCacheMeterInfoHashKeyTemp, // redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp, // redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 10, // pageSize: 10,
lastScore: cursor, // lastScore: cursor,
lastMember: member); // lastMember: member);
meterInfos.AddRange(page.Items); //meterInfos.AddRange(page.Items);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
timer.Stop(); timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return; return;
} }
@ -1031,6 +1078,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int pageNumber = 0; int pageNumber = 0;
bool hasNext; bool hasNext;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
var ddd = _runtimeContext.UseTableSessionPool;
do do
{ {
options.PageIndex = pageNumber++; options.PageIndex = pageNumber++;
@ -1054,54 +1104,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
} }
///// <summary>
///// 创建Kafka消息
///// </summary>
///// <param name="redisCacheTelemetryPacketInfoHashKey"></param>
///// <param name="redisCacheTelemetryPacketInfoZSetScoresIndexKey"></param>
///// <returns></returns>
//private async Task CreateMeterKafkaTaskMessage(
//string redisCacheTelemetryPacketInfoHashKey,
//string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
//{
// if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey))
// {
// throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败参数异常,-101");
// }
// decimal? cursor = null;
// string member = null;
// bool hasNext;
// var stopwatch = Stopwatch.StartNew();
// do
// {
// var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
// redisCacheTelemetryPacketInfoHashKey,
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
// await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
// items: page.Items,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, groupIndex) =>
// {
// _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
// }
// );
// } while (hasNext);
// stopwatch.Stop();
// _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
//}
/// <summary> /// <summary>
/// Kafka 推送消息 /// Kafka 推送消息
/// </summary> /// </summary>

View File

@ -101,6 +101,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// TypeName = 3, // TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15, // TimeDensity = 15,
// BrandType = "",
//}); //});
//ammeterInfos.Add(new AmmeterInfo() //ammeterInfos.Add(new AmmeterInfo()
//{ //{
@ -115,6 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// TypeName = 1, // TypeName = 1,
// DataTypes = "581,589,592,597,601", // DataTypes = "581,589,592,597,601",
// TimeDensity = 15, // TimeDensity = 15,
// BrandType = "",
//}); //});
//return ammeterInfos; //return ammeterInfos;
@ -127,10 +129,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
WHERE 1=1 and C.Special = 0 "; WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤 //TODO 记得移除特殊表过滤
//if (!string.IsNullOrWhiteSpace(gatherCode)) if (!string.IsNullOrWhiteSpace(gatherCode))
//{ {
// sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
//} }
return await SqlProvider.Instance.Change(DbEnum.EnergyDB) return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado .Ado
.QueryAsync<AmmeterInfo>(sql); .QueryAsync<AmmeterInfo>(sql);
@ -187,29 +189,5 @@ namespace JiShe.CollectBus.ScheduledMeterReading
.Ado .Ado
.QueryAsync<WatermeterInfo>(sql); .QueryAsync<WatermeterInfo>(sql);
} }
/// <summary>
/// 测试设备分组均衡控制算法
/// </summary>
/// <param name="deviceCount"></param>
/// <returns></returns>
[HttpGet]
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
{
var deviceList = new List<string>();
for (int i = 0; i < deviceCount; i++)
{
deviceList.Add($"Device_{Guid.NewGuid()}");
}
// 初始化缓存
DeviceGroupBalanceControl.InitializeCache(deviceList);
// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
} }
} }

View File

@ -24,13 +24,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳、或者某一个固定的标识1 /// 排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳、或者某一个固定的标识1
/// </summary> /// </summary>
[FIELDColumn] [FIELDColumn]
public string ScoreValue public string ScoreValue { get; set; }
{
get
{
return $"{DeviceId}.{TaskMark}".Md5Fun();
}
}
/// <summary> /// <summary>
/// 是否手动操作 /// 是否手动操作

View File

@ -18,7 +18,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义 /// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary> /// </summary>
[Column(IsIgnore = true)] [Column(IsIgnore = true)]
public override string MemberId => $"{FocusId}:{MeterId}"; public override string MemberId => $"{FocusAddress}:{MeteringCode}";
/// <summary> /// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳 /// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
@ -90,6 +90,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
//// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5燃气表流量计=6,特殊电表=7 //// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5燃气表流量计=6,特殊电表=7
/// </summary> /// </summary>
public MeterTypeEnum MeterType { get; set; } public MeterTypeEnum MeterType { get; set; }
/// <summary> /// <summary>
/// 设备品牌; /// 设备品牌;
/// (当 MeterType = 水表, 如 威铭、捷先 等) /// (当 MeterType = 水表, 如 威铭、捷先 等)
@ -144,6 +145,11 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// </summary> /// </summary>
public int ProjectID { get; set; } public int ProjectID { get; set; }
/// <summary>
/// 数据库业务ID
/// </summary>
public int DatabaseBusiID { get; set; }
/// <summary> /// <summary>
/// 是否异常集中器 0:正常1异常 /// 是否异常集中器 0:正常1异常
/// </summary> /// </summary>

View File

@ -0,0 +1,593 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Enums
{
/// <summary>
/// 水表\流量计\特殊电表品牌
/// </summary>
public enum BrandTypeEnum
{
/// <summary>
/// 默认OC_129
/// </summary>
//None = 0,
/// <summary>
/// 冻结数据0D_101
/// </summary>
//Freeze = 1,
/// <summary>
/// 默认OC_129or 冻结数据0D_101
/// </summary>
NoneOrFreeze = 0,
/// <summary>
/// 188-良禾
/// </summary>
LiangHe188 = 1,
/// <summary>
/// 188-威铭
/// </summary>
WeiMing188 = 2,
/// <summary>
/// 188-宁波
/// </summary>
NingBo188 = 3,
/// <summary>
/// 西恩电磁流量计
/// </summary>
XEDC = 4,
/// <summary>
/// 西恩超声波流量计
/// </summary>
XECSB = 5,
/// <summary>
/// 电磁流量计(LDG-DN200)
/// </summary>
DCLDGDN200 = 6,
/// <summary>
/// 燃气表抄读
/// </summary>
Gasmeter = 7,
/// <summary>
/// 涡街流量计
/// </summary>
WJFlowmeter = 8,
/// <summary>
/// 流量计
/// </summary>
Flowmeter = 9,
/// <summary>
/// 涡街流量计(LUGBDN100)
/// </summary>
WJFlowmeterLUGBDN100 = 10,
/// <summary>
/// 涡街流量计(ZC-LUGB-232ZDNNY)
/// </summary>
WJFlowmeterZCLUGB232ZDNNY = 11,
/// <summary>
/// SB2100蒸汽表
/// </summary>
ZQBSB2100Flowmeter = 12,
/// <summary>
/// (HD)热式流量计
/// </summary>
RSHDFlowmeter = 13,
/// <summary>
/// (HDWMNDN300)热式流量计
/// </summary>
RSHDWMNDN300 = 14,
/// <summary>
/// 热式流量计(FLRS110-C100)
/// </summary>
RSFLRS110C100 = 15,
/// <summary>
/// 通用188协议
/// </summary>
Universal188 = 16,
#region
DTZ719 = 17,
AKKJMC800PY = 18,
HRKJ001 = 19,
THYB9D1 = 20,
DTSD342 = 21,
/// <summary>
/// 谐波
/// </summary>
AFN16_F109 = 22,
/// <summary>
/// DL/T 645—2007 规约时采用该类
/// </summary>
CustomItemCode_93 = 23,
/// <summary>
/// 电表组合有功示值透抄CustomItemCode_95
/// </summary>
AFN16_F95 = 24,
#endregion
/// <summary>
/// 非特殊表
/// </summary>
None = 25,
/// <summary>
/// SDM630MCT 导轨电表
/// </summary>
SDM630MCT = 26,
/// <summary>
/// 水表通过0C_129采集
/// </summary>
Watermeter0C_129 = 27,
/// <summary>
/// YYD电磁流量计
/// </summary>
YYDFlowmeter = 28,
/// <summary>
/// 透明转发 跳合闸(水表)
/// </summary>
AFN16_F99 = 29,
/// <summary>
/// 透明转发 跳合闸(气表)
/// </summary>
AFN16_F100 = 30,
/// <summary>
/// 温度压力补偿涡街流量计(TPC1001)涡街流量计
/// </summary>
WJTPC1001 = 31,
/// <summary>
/// (LDG-SP25)树普电磁流量计
/// </summary>
ShupuLDGSP25Flowmeter = 32,
/// <summary>
/// 西恩涡街流量计(LUGBC-100)
/// </summary>
XEWJLUGBC100 = 33,
/// <summary>
/// 智能涡街流量计(UG-1132A)
/// </summary>
WJUG1132A = 34,
/// <summary>
/// 水表通过0D_101采集
/// </summary>
Watermeter0D_101 = 35,
/// <summary>
/// 读取SIM卡号
/// </summary>
AFN16_F101 = 36,
/// <summary>
/// 恒瑞科技三相导轨式电能表 or 恒瑞科技嵌入式电表测试
/// </summary>
DTS600 = 37,
/// <summary>
/// 恒瑞科技单相导轨式电能表DDS600
/// </summary>
DDS600 = 38,
/// <summary>
/// 旋进漩涡流量计(LUXB) 天津凯隆仪表科技有限公司
/// </summary>
XJXWLUXB = 39,
/// <summary>
/// DDSD720-L科陆单相导轨表
/// </summary>
DDSD720L = 40,
/// <summary>
/// 东久电磁流量计DJLD
/// </summary>
DJLD = 41,
/// <summary>
/// DTSD720-L科陆三相导轨表
/// </summary>
DTSD720L = 42,
/// <summary>
/// 世成(社为表计)涡街流量计
/// </summary>
SCLUGB = 43,
/// <summary>
/// CL7339MN-ZY科陆三相表
/// </summary>
CL7339MNZY = 44,
/// <summary>
/// 江森智能SNY723MC数显表
/// </summary>
SNY723MC = 45,
/// <summary>
/// 珠海派诺科技PMAC770三相数显表
/// </summary>
PMAC770 = 46,
/// <summary>
/// 北京中科涡街流量计ZKTD-LUCBY
/// </summary>
ZKTD_LUGBY = 47,
/// <summary>
/// 夏仪股份蒸汽流量计(LUGB-DN
/// </summary>
LUGB_DN = 48,
/// <summary>
/// LWQ-D2型气体涡轮流量计
/// </summary>
LWQ_D2 = 49,
/// <summary>
/// 西恩涡街流量计分体式流量积算仪32FC系列
/// </summary>
XEJSY32FC = 50,
/// <summary>
/// 寺崎科技PD652E-9S4电表
/// </summary>
PD652E9S4 = 51,
/// <summary>
/// 液体涡轮流量计(LWGY)
/// </summary>
LWGY = 52,
/// <summary>
/// 多功能积算仪(RW-A)
/// </summary>
DGNRWA = 53,
/// <summary>
/// 杭梅电气DTS804导轨表
/// </summary>
DTS804 = 54,
/// <summary>
/// 杭梅电气HG194-D93数显表
/// </summary>
HG194D93 = 55,
/// <summary>
/// 连水超声波水表188
/// </summary>
Lianshui188 = 56,
/// <summary>
/// 湖北回盛生物科技有限公司EZT96Y数显表
/// </summary>
EZT96Y,
/// <summary>
/// 上海肖月智能流量积算仪
/// </summary>
ZNLLJ,
/// <summary>
/// 西安诚通电磁流量计
/// </summary>
CTLDE250SC31GM8FB,
/// <summary>
/// 雅达YD2040
/// </summary>
YD2040,
/// <summary>
/// EVC智能体积修正仪
/// </summary>
EVC,
/// <summary>
/// 气体超声流量计IGSM-TS
/// </summary>
IGSMTS,
/// <summary>
/// YX-9SYE三相多功能表
/// </summary>
YX9SYE,
/// <summary>
/// 世成液体涡轮流量计(SCLWGY-DN50)
/// </summary>
SCLWGYDN50,
/// <summary>
/// 杭州盘古积算仪(FX6000F)
/// </summary>
FX6000F,
/// <summary>
/// "盘古电磁流量计(PMF-GM4.0A1-50M11K1F1T0C3)
/// </summary>
PFMGM40A150M11K1F1T0C3,
/// <summary>
/// 西恩液体涡轮流量计(SEAN LWGY-50)
/// </summary>
SeanLWGY50,
/// <summary>
/// 雷泰电磁流量计LD-LDE-DN50
/// </summary>
LDLDEDN50,
/// <summary>
/// 雷泰涡街流量计(LT-LUGB-DN50)
/// </summary>
LTLUGBDN50,
/// <summary>
/// 珠海派诺科技股份有限公司SPM33电力仪表
/// </summary>
SPM33,
/// <summary>
/// 株洲斯瑞克电气有限公司三相数显多功能电力仪表PD369E-AS4
/// </summary>
PD369EAS4,
/// <summary>
/// 湖北回盛生物科技有限公司-涡街流量计(10VTEAD03A200C1A2HOAG)
/// </summary>
WJ10VTEAD03A200C1A2HOAG,
/// <summary>
/// 世成旋进旋涡流量计SCLUX-DN25
/// </summary>
SCLUXDN25,
/// <summary>
/// 世成气体涡轮流量计(SCLWGQ-DN50)
/// </summary>
SCLWGQDN50,
/// <summary>
/// 智能电磁流量计(MDL210)
/// </summary>
MDL210,
/// <summary>
/// 江苏华海涡街流量计Focvor4202
/// </summary>
Focvor4202,
/// <summary>
/// 华凯电力HK194E-9S4
/// </summary>
HK194E9S4,
/// <summary>
/// 威胜测试-DTSD342_9N
/// </summary>
DTSD342Test,
/// <summary>
///科迈捷涡街流量计VFM-60
/// </summary>
VFM60,
/// <summary>
///江苏华海涡街流量计积算仪
/// </summary>
HHJSY,
/// <summary>
///宏江4G水表
/// </summary>
HJDN15,
/// <summary>
///世成4G涡街流量计
/// </summary>
LPV2,
/// <summary>
///浙江正泰DTSU666
/// </summary>
DTSU666,
/// <summary>
/// 浙江启唯电气-数码三相多功能电表QV194E-9S4
/// </summary>
QV194E9S4,
/// <summary>
/// 施耐德PM2100
/// </summary>
PM2100,
/// <summary>
/// 天康电磁流量计
/// </summary>
TK1100FT,
/// <summary>
/// 西恩气体涡轮流量计(SEANLWQ)
/// </summary>
SEANLWQ,
/// <summary>
/// V880BR涡街流量计
/// </summary>
V880BR,
/// <summary>
/// 大导SDD194E-9
/// </summary>
SDD194E_9,
/// <summary>
///泉高阀门科技有限公司-超声波水表
/// </summary>
QGFMCSB,
#region
SensorMeter,
#endregion
/// <summary>
/// 分体式超声波明渠流量计SC-6000F
/// </summary>
SC6000F,
/// <summary>
/// 江苏京仪JLWQ型气体流量计JLWQ型气体流量计
/// </summary>
JLWQ,
/// <summary>
/// 广州智光SMC200
/// </summary>
SMC200,
/// <summary>
/// 左拓ZFM2-621
/// </summary>
ZFM2621,
/// <summary>
/// 江苏华尔威涡街旋进流量计
/// </summary>
HRW520,
/// <summary>
/// 施耐德PM5350P
/// </summary>
PM5350P,
/// <summary>
/// 施耐德PM810MG
/// </summary>
PM810MG,
/// <summary>
/// 浙江之高ZL96-3E
/// </summary>
ZL96_3E,
/// <summary>
/// 拓普电子PD284Z-9S4
/// </summary>
PD284Z_9S4,
/// <summary>
/// 上海普川DTSU5886
/// </summary>
DTSU5886,
/// <summary>
/// 安德利SC194E-9S4
/// </summary>
SC194E9S4,
/// <summary>
/// 浙江天电电气TD700E-AS3
/// </summary>
TD700EAS3,
/// <summary>
/// 世成分体式涡街流量计SW-SCLUGB-DN
/// </summary>
SWSCLUGBDN,
/// <summary>
/// 东久电磁冷热量计SW-DJLD
/// </summary>
SWDJLD,
/// <summary>
/// 北京中科拓达ZKTD-LUGB
/// </summary>
ZKTD_LUGB,
/// <summary>
/// 江苏英美迪自动化有限公司三相液晶多功能仪表YMD96A-E4
/// </summary>
YMD96A_E4,
/// <summary>
/// 金湖盛元LWQ气体涡轮流量计
/// </summary>
JHSYLWQ,
/// <summary>
/// 天康涡街流量计TK2000
/// </summary>
TK2000,
/// <summary>
/// 浙江迈拓三相导轨电表DTSF1709
/// </summary>
DTSF1709,
/// <summary>
/// 杭州逸控科技超声波流量计ECUL30B-L2C1NSVC
/// </summary>
ECUL30BL2C1NSVC,
/// <summary>
/// 数字电测表HTD288-DM44/R
/// </summary>
HTD288,
/// <summary>
/// 杭州逸控科技有限公司ECLUGB2305W3C2N
/// </summary>
ECLUGB2305W3C2N,
/// <summary>
/// 江苏华海测控科技有限公司温压补偿流量积算仪
/// </summary>
XMJA9000,
/// <summary>
/// 湖南佳一机电设备有限公司精致型蒸汽热能积算仪
/// </summary>
F3200H,
/// <summary>
/// 合兴加能电梯能量回馈装置
/// </summary>
IPCPFE04MNDC,
/// <summary>
/// 宁波创盛旋涡流量计
/// </summary>
CX25,
/// <summary>
/// 群力电气QDCZY1N
/// </summary>
QDCZY1N,
/// <summary>
///深圳中电PMCS963C
/// </summary>
PMCS963C,
/// <summary>
/// 迅尔燃气表D2SD2ET3D4S
/// </summary>
D2SD2ET3D4S,
/// <summary>
/// INTELLIGENT积算仪F2000X
/// </summary>
F2000X,
/// <summary>
///多盟-DM194Z-9SY
/// </summary>
DM194Z9SY,
/// <summary>
/// 纳宇PD760
/// </summary>
PD760,
/// <summary>
/// 纳宇DTS90031LPD
/// </summary>
DTS90031LPD,
/// <summary>
/// 上海施易克SEIK680数显表
/// </summary>
SEIK680,
/// <summary>
/// 中灵电气ZL125SC
/// </summary>
ZL125SC,
/// <summary>
/// 江苏京仪气体涡轮流量计JLWQE
/// </summary>
JLWQE,
/// <summary>
/// HART智能转换器
/// </summary>
SM100,
/// <summary>
/// 拓思特涡街流量计H880BR
/// </summary>
H880BR,
/// <summary>
/// DDSD720-L-单相电子式导轨表
/// </summary>
DDSD720L2,
/// <summary>
/// 浙江智电三相三线大功率有功电能表
/// </summary>
ZJZDSXSX,
/// <summary>
/// 山水翔水表LXSY
/// </summary>
LXSY,
/// <summary>
/// 衡水多元仪表有限公司气体涡轮流量计DYWQ
/// </summary>
DYWQ,
/// <summary>
/// 安徽聚积电子
/// </summary>
DDS2052,
/// <summary>
/// 湖南中麦
/// </summary>
ZMDTSD3429N,
/// <summary>
///DTS2377三相导轨式多功能智能电表
/// </summary>
DTS2377
}
}

View File

@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Enums
{
/// <summary>
/// 表计连接通讯协议--表计与集中器的通讯协议
/// </summary>
public enum MeterLinkProtocol
{
/// <summary>
/// 无
/// </summary>
None = 0,
/// <summary>
/// DL/T 645—1997
/// </summary>
DLT_645_1997 = 1,
/// <summary>
/// 交流采样装置通信协议(电表)
/// </summary>
ACSamplingDevice = 2,
/// <summary>
/// DL/T 645—2007
/// </summary>
DLT_645_2007 = 30,
/// <summary>
/// 载波通信
/// </summary>
Carrierwave = 31,
/// <summary>
/// CJ/T 188—2018协议(水表)
/// </summary>
CJT_188_2018 = 32,
/// <summary>
/// CJ/T 188—2004协议
/// </summary>
CJT_188_2004 = 33,
/// <summary>
/// MODBUS-RTU
/// </summary>
MODBUS_RTU = 34,
}
}

View File

@ -93,7 +93,7 @@
"ClusterList": [ "192.168.1.9:6667" ], "ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 2, "PoolSize": 2,
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": false, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"Cassandra": { "Cassandra": {