diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
index 9587549..6bcbc5c 100644
--- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
@@ -13,6 +13,12 @@ namespace JiShe.CollectBus.IoTDB.Interface
///
Task OpenAsync();
+ ///
+ /// 关闭连接池
+ ///
+ ///
+ Task CloseAsync();
+
///
/// 插入数据
///
diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
index 0d01f81..251e48b 100644
--- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
@@ -47,5 +47,10 @@
/// 时区,默认为:"UTC+08:00"
///
public string ZoneId { get; set; } = "UTC+08:00";
+
+ ///
+ /// 请求超时时间,单位毫秒,默认为:50000
+ ///
+ public long Timeout { get; set; } = 50000;
}
}
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
index 7cfaf32..9c7c602 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
@@ -1,6 +1,8 @@
using System;
using System.Collections.Concurrent;
+using System.ComponentModel.DataAnnotations;
using System.Reflection;
+using System.Reflection.Metadata.Ecma335;
using System.Text;
using System.Threading.Tasks;
using Apache.IoTDB;
@@ -23,7 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
/// IoTDB数据源
///
- public class IoTDbProvider : IIoTDbProvider, IScopedDependency
+ public class IoTDbProvider : IIoTDbProvider, ITransientDependency
{
private static readonly ConcurrentDictionary MetadataCache = new();
private readonly ILogger _logger;
@@ -195,7 +197,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
try
{
- var query =await BuildQuerySQL(options);
+ var query = await BuildQuerySQL(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
@@ -205,15 +207,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
Items = await ParseResults(sessionDataSet, options.PageSize),
PageIndex = options.PageIndex,
PageSize = options.PageSize,
-
+
};
- result.HasNext = result.TotalCount > 0? result.TotalCount < result.PageSize : false;
+ result.HasNext = result.TotalCount > 0 ? result.TotalCount < result.PageSize : false;
return result;
}
catch (Exception ex)
{
+ CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
throw;
}
@@ -414,7 +417,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity
{
var metadata = await GetMetadata();
- var sb = new StringBuilder("SELECT ");
+ var sb = new StringBuilder("SELECT TIME as Timestamps,");
sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}");
@@ -471,7 +474,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
return condition.Operator switch
{
- ">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}": $"{condition.Field} > '{condition.Value}'",
+ ">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}" : $"{condition.Field} > '{condition.Value}'",
"<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'",
"=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'",
_ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况")
@@ -493,7 +496,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
- return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
+ if (result.HasNext())
+ {
+ await result.Close();
+ return 0;
+ }
+
+ var count = Convert.ToInt32(result.Next().Values[0]);
+ await result.Close();
+
+ return count;
}
///
@@ -510,6 +522,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
var properties = typeof(T).GetProperties();
+ var columns = new List() { "Timestamps" };
+ var dataTypes = new List() { TSDataType.TIMESTAMP };
+ columns.AddRange(metadata.ColumnNames);
+ dataTypes.AddRange(metadata.DataTypes);
+ //metadata.ColumnNames.Insert(0, "Timestamps");
+ //metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP);
+
while (dataSet.HasNext() && results.Count < pageSize)
{
var record = dataSet.Next();
@@ -518,30 +537,34 @@ namespace JiShe.CollectBus.IoTDB.Provider
Timestamps = record.Timestamps
};
- foreach (var measurement in metadata.ColumnNames)
+ foreach (var measurement in columns)
{
- int indexOf = metadata.ColumnNames.IndexOf(measurement);
+ int indexOf = columns.IndexOf(measurement);
var value = record.Values[indexOf];
+ TSDataType tSDataType = dataTypes[indexOf];
var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
- if (prop != null)
+ if (prop != null && !(value is System.DBNull))
{
- if (measurement.EndsWith("time"))
+ dynamic tempValue = GetTSDataValue(tSDataType, value);
+
+ if (measurement.ToLower().EndsWith("time"))
{
- var tempValue = TimestampHelper.ConvertToDateTime(Convert.ToInt64(value), TimestampUnit.Nanoseconds);
- typeof(T).GetProperty(measurement)?.SetValue(entity, value);
+ typeof(T).GetProperty(measurement)?.SetValue(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds));
}
else
{
- typeof(T).GetProperty(measurement)?.SetValue(entity, value);
+ typeof(T).GetProperty(measurement)?.SetValue(entity, tempValue);
}
}
}
results.Add(entity);
+
}
+ await dataSet.Close();
return results;
}
@@ -759,5 +782,28 @@ namespace JiShe.CollectBus.IoTDB.Provider
["DECIMAL"] = "0.0",
["STRING"] = string.Empty
};
+
+ ///
+ /// IoTDB 数据类型与.net类型映射
+ ///
+ ///
+ ///
+ ///
+ private dynamic GetTSDataValue(TSDataType tSDataType, object value) =>
+ tSDataType switch
+ {
+ TSDataType.BOOLEAN => Convert.ToBoolean(value),
+ TSDataType.INT32 => Convert.ToInt32(value),
+ TSDataType.INT64 => Convert.ToInt64(value),
+ TSDataType.FLOAT => Convert.ToDouble(value),
+ TSDataType.DOUBLE => Convert.ToDouble(value),
+ TSDataType.TEXT => Convert.ToString(value),
+ TSDataType.NONE => null,
+ TSDataType.TIMESTAMP => Convert.ToInt64(value),
+ TSDataType.DATE => Convert.ToDateTime(value),
+ TSDataType.BLOB => Convert.ToByte(value),
+ TSDataType.STRING => Convert.ToString(value),
+ _ => Convert.ToString(value)
+ };
}
}
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
index eacf246..ee71cf1 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
@@ -47,6 +47,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
+ ///
+ /// 关闭连接池
+ ///
+ ///
+ public async Task CloseAsync()
+ {
+ if (_sessionPool == null)
+ {
+ return;
+ }
+ await _sessionPool.Close();
+ }
+
///
/// 批量插入对齐时间序列数据
///
@@ -59,7 +72,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功,返回结果为:{result}");
}
-
+ //await CloseAsync();
return result;
}
@@ -70,7 +83,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public async Task ExecuteQueryStatementAsync(string sql)
{
- return await _sessionPool.ExecuteQueryStatementAsync(sql);
+ var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
+ //await result.Close();
+ //await CloseAsync();
+ return result;
}
public void Dispose()
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
index 137f5a8..dc4f0ee 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
@@ -45,6 +45,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
+ ///
+ /// 关闭连接池
+ ///
+ ///
+ public async Task CloseAsync()
+ {
+ if (_sessionPool == null)
+ {
+ return;
+ }
+ await _sessionPool.Close();
+ }
+
///
/// 批量插入
///
@@ -58,6 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功,返回结果为:{result}");
}
+ //await CloseAsync();
return result;
}
@@ -68,7 +82,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public async Task ExecuteQueryStatementAsync(string sql)
{
- return await _sessionPool.ExecuteQueryStatementAsync(sql);
+ var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
+ //await result.Close();
+ //await CloseAsync();
+ return result;
}
public void Dispose()
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 0d97c69..095f01b 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -1,4 +1,5 @@
-using Confluent.Kafka;
+using Amazon.Runtime.Internal.Endpoints.StandardLibrary;
+using Confluent.Kafka;
using DnsClient.Protocol;
using FreeSql;
using JiShe.CollectBus.Ammeters;
@@ -6,10 +7,12 @@ using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
+using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
@@ -32,10 +35,13 @@ using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
+using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata;
+using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@@ -65,6 +71,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
+
+ _runtimeContext.UseTableSessionPool = true;
}
///
@@ -111,7 +119,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
var currentTime = DateTime.Now;
-
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item);
@@ -121,7 +128,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率
+ //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>ServerTagName,tempArryay[3]=>TaskInfo,tempArryay[4]=>表计类别,tempArryay[5]=>采集频率
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
@@ -139,21 +146,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterTypes = EnumExtensions.ToEnumDictionary();
+ var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候,NextTaskTime已经格式化到下一个采集点时间。
+
+ var metadata = await _dbProvider.GetMetadata();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
- //List pushTaskInfos = new();
- _runtimeContext.UseTableSessionPool = true;
- var metadata = await _dbProvider.GetMetadata();
_ = CreateMeterPublishTask(
timeDensity: timeDensity,
- nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity),
+ nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
- _logger.LogWarning($"{data.Name} 任务数据构建失败:{data.Serialize()}");
+ _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_dbProvider.BatchInsertAsync(metadata, tempTask);
@@ -161,16 +168,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
- //todo 水表任务创建待处理
- //await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos);
_ = CreateMeterPublishTask(
timeDensity: timeDensity,
- nextTaskTime: tasksToBeIssueModel.NextTaskTime,
+ nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
- taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
+ taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{
- //AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
+ var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
+
+ if (tempTask == null || tempTask.Count <= 0)
+ {
+ _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
+ return;
+ }
+ _dbProvider.BatchInsertAsync(metadata, tempTask);
});
}
else
@@ -183,8 +195,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
- tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime;
- tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity);
+ tasksToBeIssueModel.LastTaskTime = currentTaskTime;
+ tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
}
@@ -208,52 +220,53 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
-#if DEBUG
- var timeDensity = "15";
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ //此处代码不要删除
+ //#if DEBUG
+ // var timeDensity = "15";
+ // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ // var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
- List meterInfos = new List();
- List focusAddressDataLista = new List();
- var timer1 = Stopwatch.StartNew();
+ // List meterInfos = new List();
+ // List focusAddressDataLista = new List();
+ // var timer1 = Stopwatch.StartNew();
- var allIds = new HashSet();
- decimal? score = null;
- string member = null;
+ // var allIds = new HashSet();
+ // decimal? score = null;
+ // string member = null;
- while (true)
- {
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- pageSize: 1000,
- lastScore: score,
- lastMember: member);
+ // while (true)
+ // {
+ // var page = await _redisDataCacheService.GetAllPagedData(
+ // redisCacheMeterInfoHashKeyTemp,
+ // redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ // pageSize: 1000,
+ // lastScore: score,
+ // lastMember: member);
- meterInfos.AddRange(page.Items);
- focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
- foreach (var item in page.Items)
- {
- if (!allIds.Add(item.MemberId))
- {
- _logger.LogError($"{item.MemberId}Duplicate data found!");
- }
- }
- if (!page.HasNext) break;
- score = page.NextScore;
- member = page.NextMember;
- }
+ // meterInfos.AddRange(page.Items);
+ // focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
+ // foreach (var item in page.Items)
+ // {
+ // if (!allIds.Add(item.MemberId))
+ // {
+ // _logger.LogError($"{item.MemberId}Duplicate data found!");
+ // }
+ // }
+ // if (!page.HasNext) break;
+ // score = page.NextScore;
+ // member = page.NextMember;
+ // }
- timer1.Stop();
- _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
- DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
- return;
-#else
+ // timer1.Stop();
+ // _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
+ // DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
+ // return;
+ //#else
+ // var meterInfos = await GetAmmeterInfoList(gatherCode);
+ //#endif
var meterInfos = await GetAmmeterInfoList(gatherCode);
-#endif
-
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
@@ -310,17 +323,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
focusAddressDataList.Add(item.Key);
- // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
-
-#if DEBUG
- //每次缓存时,删除缓存,避免缓存数据有不准确的问题
- //await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
-#else
- //每次缓存时,删除缓存,避免缓存数据有不准确的问题
- //await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
-#endif
-
- //Dictionary keyValuePairs = new Dictionary();
foreach (var ammeter in item)
{
//处理ItemCode
@@ -397,72 +399,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
-
- //获取缓存中的电表信息
int timeDensity = 5;
- var currentTime = DateTime.Now;
-
- // 自动计算最佳并发度
- int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
-
- var options = new ParallelOptions
- {
- MaxDegreeOfParallelism = recommendedThreads,
- };
- var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
-
- //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
- //{
- // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
-
- // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
- //});
-
- await Task.CompletedTask;
-
- }
-
- ///
- /// 5分钟采集电表数据
- ///
- ///
- public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
- {
- //获取缓存中的电表信息
- int timeDensity = 5;
- var currentTime = DateTime.Now;
-
- // 自动计算最佳并发度
- int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
-
- var options = new ParallelOptions
- {
- MaxDegreeOfParallelism = recommendedThreads,
- };
- var taskBatch = $"{currentTime:yyyyMMddHHmm00}";
-
- //Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
- //{
- // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
-
- // _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
- //});
- }
-
- ///
- /// 15分钟采集电表数据
- ///
- ///
- public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
- {
- //获取缓存中的电表信息
- int timeDensity = 15;
- //var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity);
- var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity);
-
- var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity);
+ var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
@@ -470,14 +408,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
return;
}
-
- // 自动计算最佳并发度
- int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
- var options = new ParallelOptions
- {
- MaxDegreeOfParallelism = recommendedThreads,
- };
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List();
@@ -495,65 +426,81 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PageIndex = 1,
PageSize = 3000,
Conditions = conditions,
- });
+ });
+
}
+ ///
+ /// 5分钟采集电表数据
+ ///
+ ///
+ public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
+ {
+ int timeDensity = 5;
+ var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
+ var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey);
- /////
- ///// 创建电表待发送的任务数据
- /////
- ///// 采集频率
- ///// 时间格式的任务批次名称
- /////
- //private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch)
- //{
- // var timer = Stopwatch.StartNew();
+ if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
+ return;
+ }
- // //获取对应频率中的所有电表信息
- // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- // var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
- // List meterInfos = new List();
- // decimal? cursor = null;
- // string member = null;
- // bool hasNext;
- // do
- // {
- // var page = await _redisDataCacheService.GetAllPagedData(
- // redisCacheMeterInfoHashKeyTemp,
- // redisCacheMeterInfoZSetScoresIndexKeyTemp,
- // pageSize: 1000,
- // lastScore: cursor,
- // lastMember: member);
+ var conditions = new List();
+ conditions.Add(new QueryCondition()
+ {
+ Field = "PendingCopyReadTime",
+ Operator = "=",
+ IsNumber = true,
+ Value = pendingCopyReadTime
+ });
- // meterInfos.AddRange(page.Items);
- // cursor = page.HasNext ? page.NextScore : null;
- // member = page.HasNext ? page.NextMember : null;
- // hasNext = page.HasNext;
- // } while (hasNext);
+ _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions()
+ {
+ TableNameOrTreePath = DevicePathBuilder.GetTableName(),
+ PageIndex = 1,
+ PageSize = 3000,
+ Conditions = conditions,
+ });
+ }
- // if (meterInfos == null || meterInfos.Count <= 0)
- // {
- // timer.Stop();
- // _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
- // return;
- // }
+ ///
+ /// 15分钟采集电表数据
+ ///
+ ///
+ public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
+ {
+ int timeDensity = 15;
+ var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
+ var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey);
+ if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
+ return;
+ }
- // await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
- // items: meterInfos,
- // deviceIdSelector: data => data.FocusAddress,
- // processor: (data, groupIndex) =>
- // {
- // AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch);
- // }
- // );
+ var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
- // timer.Stop();
- // _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
- //}
+ var conditions = new List();
+ conditions.Add(new QueryCondition()
+ {
+ Field = "PendingCopyReadTime",
+ Operator = "=",
+ IsNumber = true,
+ Value = pendingCopyReadTime
+ });
+ _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions()
+ {
+ TableNameOrTreePath = DevicePathBuilder.GetTableName(),
+ PageIndex = 1,
+ PageSize = 3000,
+ Conditions = conditions,
+ });
+ }
///
/// 创建电表待发送的任务数据
@@ -563,8 +510,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- private List AmmerterCreatePublishTaskAction(int timeDensity
- , AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ private List AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
@@ -573,20 +519,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
- // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
- //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return null;
}
if (ammeterInfo.State.Equals(2))
{
- //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
+ _logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return null;
}
@@ -599,22 +545,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
- // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
return null;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
- //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
return null;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
- //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
return null;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{
- //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
return null;
}
@@ -709,7 +655,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
-
+ string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
@@ -726,13 +672,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem,
- TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA),
+ TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
+ ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
@@ -776,10 +723,35 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
+ List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。
+
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
+ if (_kafkaOptions.FirstCollectionTime.HasValue == false)
+ {
+ _kafkaOptions.FirstCollectionTime = DateTime.Now;
+ }
+
+ //先处理采集频率任务缓存
+ foreach (var item in meterInfoGroupByTimeDensity)
+ {
+ TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
+ {
+ LastTaskTime = null,
+ TimeDensity = item.Key,
+ NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
+ };
+
+ //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
+
+ var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
+ await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
+ }
+
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
+ List watermeterInfo = new List();
+
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
@@ -789,25 +761,33 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}";
- Dictionary keyValuePairs = new Dictionary();
+ focusAddressDataList.Add(item.Key);
+
+ var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+
foreach (var subItem in item)
{
-
- keyValuePairs.TryAdd($"{subItem.MeterId}", subItem);
+ watermeterInfo.Add(subItem);
}
- await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
+
+ await _redisDataCacheService.BatchInsertDataAsync(
+ redisCacheMeterInfoHashKey,
+ redisCacheMeterInfoSetIndexKey,
+ redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
}
+ }
- //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = itemTimeDensity.Key,
- NextTaskTime = DateTime.Now.AddMinutes(1)
- };
+ //初始化设备组负载控制
+ if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
+ {
+ _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
- var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
- await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
+ }
+ else
+ {
+ DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions);
}
@@ -821,107 +801,186 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task WatermeterScheduledMeterAutoReading()
{
//获取缓存中的水表信息
- int timeDensity = 1;
- var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
- var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ int timeDensity = 60;//水表目前只有一个采集频率 60分钟
+ var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
+ var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey);
+
+ if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
- _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
+ _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
return;
}
- //获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter);
- if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
+ var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
+
+ var conditions = new List();
+ conditions.Add(new QueryCondition()
{
- _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
- return;
- }
+ Field = "PendingCopyReadTime",
+ Operator = "=",
+ IsNumber = true,
+ Value = pendingCopyReadTime
+ });
- List meterTaskInfosList = new List();
-
- //将取出的缓存任务数据发送到Kafka消息队列中
- foreach (var focusItem in meterTaskInfos)
+ _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions()
{
- foreach (var ammerterItem in focusItem.Value)
- {
- var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
- {
- MessageHexString = ammerterItem.Value.IssuedMessageHexString,
- MessageId = ammerterItem.Value.IssuedMessageId,
- FocusAddress = ammerterItem.Value.FocusAddress,
- TimeDensity = timeDensity.ToString(),
- };
-
- //await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
-
- //_ = _producerBus.Publish(tempMsg);
-
-
- meterTaskInfosList.Add(ammerterItem.Value);
- }
- }
- if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
- {
- // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
- }
-
- ////删除任务数据
- //await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
-
- ////缓存下一个时间的任务
- //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
-
+ TableNameOrTreePath = DevicePathBuilder.GetTableName(),
+ PageIndex = 1,
+ PageSize = 3000,
+ Conditions = conditions,
+ });
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
}
- /////
- ///// 创建水表待发送的任务数据
- /////
- ///// 采集频率
- ///// 水表信息
- ///// 集中器所在分组
- ///// 时间格式的任务批次名称
- /////
- //private void WatermeterCreatePublishTaskAction(int timeDensity
- // , WatermeterInfo meterInfo, int groupIndex, string taskBatch)
- //{
- // var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
+ ///
+ /// 创建水表待发送的任务数据
+ ///
+ /// 采集频率
+ /// 水表信息
+ /// 集中器所在分组
+ /// 时间格式的任务批次名称
+ ///
+ private List WatermeterCreatePublishTaskAction(int timeDensity
+ , WatermeterInfo watermeter, int groupIndex, DateTime timestamps)
+ {
+ var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
+ var currentTime = DateTime.Now;
+
+ string typeName;
+ if (watermeter.MeterType == MeterTypeEnum.WaterMeter)
+ {
+ timeDensity = watermeter.TimeDensity;//水表默认为60分钟
+ typeName = watermeter.LinkType;
+ if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔"))
+ {
+ typeName = watermeter.MeterBrand;
+ }
+ }
+ else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
+ {
+ typeName = watermeter.MeterBrand;
+ }
+ else
+ {
+ _logger.LogError($"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}");
+ return null;
+ }
+
+ List taskList = new List();
- // var currentTime = DateTime.Now;
- // var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
+ List tempCodes = new List() { "10_1" };
- // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- // var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
- // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ //todo 后续从协议池获取
+ if (watermeter.MeterTypeName.Equals("水表") && (watermeter.Protocol.Equals((int)MeterLinkProtocol.CJT_188_2018) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_1997) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_2007)))//水表且(CJT_188_2018或DLT_645_1997)都采用0C_129
+ {
+ if (watermeter.MeterBrand.Contains("炬华有线"))
+ {
+ tempCodes = new List() { "0C_188" };
+ }
+ else
+ {
+ tempCodes = new List() { "0C_129" };
+ }
+ }
+
+ else if (typeName.Trim().Equals("西恩超声波流量计"))
+ {
+ tempCodes = new List() { "10_1" };
+ }
+ else if (typeName.Trim().Equals("江苏华海涡街流量计积算仪"))
+ {
+ tempCodes = new List() { "10_1" };
+ }
+ else if (typeName.Trim().Equals("V880BR涡街流量计"))
+ {
+ tempCodes = new List() { "10_1" };
+ }
+ else if (typeName.Trim().Equals("拓思特涡街流量计H880BR"))
+ {
+ tempCodes = new List() { "10_1" };
+ }
- // var taskInfo = new MeterReadingTelemetryPacketInfo()
- // {
- // Seq= null,
+ foreach (var tempItem in tempCodes)
+ {
+ //排除已发送日冻结和月冻结采集项配置
+ if (DayFreezeCodes.Contains(tempItem))
+ {
+ continue;
+ }
- // };
- // //
+ if (MonthFreezeCodes.Contains(tempItem))
+ {
+ continue;
+ }
- // Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address);
+ var itemCodeArr = tempItem.Split('_');
+ var aFNStr = itemCodeArr[0];
+ var aFN = (AFN)aFNStr.HexToDec();
+ var fn = int.Parse(itemCodeArr[1]);
+ TelemetryPacketResponse builderResponse = null;
- // using (var pipe = FreeRedisProvider.Instance.StartPipe())
- // {
- // // 主数据存储Hash
- // pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize());
+ string methonCode = $"AFN{aFNStr}_Fn_Send";
+ //特殊表暂不处理
+ if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
+ , out var handler))
+ {
+ builderResponse = handler(new TelemetryPacketRequest()
+ {
+ FocusAddress = watermeter.FocusAddress,
+ Fn = fn,
+ Pn = watermeter.MeteringCode,
+ DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address),
+ });
+ }
+ else
+ {
+ _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}无效编码。");
+ continue;
+ }
- // // Set索引缓存
- // pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId);
+ if (builderResponse == null || builderResponse.Data.Length <= 0)
+ {
+ _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}未能正确获取报文。");
+ continue;
+ }
- // // ZSET索引缓存Key
- // pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId);
- // pipe.EndPipe();
- // }
+ string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, watermeter.MeteringCode, builderResponse.MSA);
+ var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
+ {
+ SystemName = SystemType,
+ ProjectId = $"{watermeter.ProjectID}",
+ DeviceType = $"{MeterTypeEnum.Ammeter}",
+ DeviceId = $"{watermeter.FocusAddress}",
+ Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
+ DatabaseBusiID = watermeter.DatabaseBusiID,
+ PendingCopyReadTime = timestamps,
+ CreationTime = currentTime,
+ MeterAddress = watermeter.MeterAddress,
+ AFN = (int)aFN,
+ Fn = fn,
+ //Seq = builderResponse.Seq,
+ MSA = builderResponse.MSA,
+ ItemCode = tempItem,
+ TaskMark = taskMark,
+ IsSend = false,
+ ManualOrNot = false,
+ Pn = watermeter.MeteringCode,
+ IssuedMessageId = GuidGenerator.Create().ToString(),
+ IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
+ IsReceived = false,
+ ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
+ };
- //}
+ taskList.Add(meterReadingRecords);
+ }
+
+ return taskList;
+
+ }
#endregion
@@ -944,18 +1003,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return false;
}
- ///
- /// 获取缓存表计下发指令缓存key前缀
- ///
- ///
- ///
- ///
- private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType)
- {
- return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*";
- }
-
-
///
/// 创建表的待发送的任务数据
///
@@ -977,34 +1024,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading
decimal? cursor = null;
string member = null;
bool hasNext;
- //do
- //{
- // var page = await _redisDataCacheService.GetAllPagedData(
- // redisCacheMeterInfoHashKeyTemp,
- // redisCacheMeterInfoZSetScoresIndexKeyTemp,
- // pageSize: 1000,
- // lastScore: cursor,
- // lastMember: member);
+ do
+ {
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheMeterInfoHashKeyTemp,
+ redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
- // meterInfos.AddRange(page.Items);
- // cursor = page.HasNext ? page.NextScore : null;
- // member = page.HasNext ? page.NextMember : null;
- // hasNext = page.HasNext;
- //} while (hasNext);
+ meterInfos.AddRange(page.Items);
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
+ hasNext = page.HasNext;
+ } while (hasNext);
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- pageSize: 10,
- lastScore: cursor,
- lastMember: member);
- meterInfos.AddRange(page.Items);
+ //var page = await _redisDataCacheService.GetAllPagedData(
+ // redisCacheMeterInfoHashKeyTemp,
+ // redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ // pageSize: 10,
+ // lastScore: cursor,
+ // lastMember: member);
+ //meterInfos.AddRange(page.Items);
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
- _logger.LogError($"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
+ _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
@@ -1031,6 +1078,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int pageNumber = 0;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
+
+ var ddd = _runtimeContext.UseTableSessionPool;
+
do
{
options.PageIndex = pageNumber++;
@@ -1054,54 +1104,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
-
- /////
- ///// 创建Kafka消息
- /////
- /////
- /////
- /////
- //private async Task CreateMeterKafkaTaskMessage(
- //string redisCacheTelemetryPacketInfoHashKey,
- //string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
- //{
- // if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey))
- // {
- // throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101");
- // }
-
- // decimal? cursor = null;
- // string member = null;
- // bool hasNext;
- // var stopwatch = Stopwatch.StartNew();
- // do
- // {
- // var page = await _redisDataCacheService.GetAllPagedData(
- // redisCacheTelemetryPacketInfoHashKey,
- // redisCacheTelemetryPacketInfoZSetScoresIndexKey,
- // pageSize: 1000,
- // lastScore: cursor,
- // lastMember: member);
-
- // cursor = page.HasNext ? page.NextScore : null;
- // member = page.HasNext ? page.NextMember : null;
- // hasNext = page.HasNext;
-
- // await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
- // items: page.Items,
- // deviceIdSelector: data => data.FocusAddress,
- // processor: (data, groupIndex) =>
- // {
- // _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
- // }
- // );
-
- // } while (hasNext);
-
- // stopwatch.Stop();
- // _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
- //}
-
///
/// Kafka 推送消息
///
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index fe0746f..90266ce 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -101,6 +101,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
+ // BrandType = "",
//});
//ammeterInfos.Add(new AmmeterInfo()
//{
@@ -115,6 +116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
+ // BrandType = "",
//});
//return ammeterInfos;
@@ -127,10 +129,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
- //if (!string.IsNullOrWhiteSpace(gatherCode))
- //{
- // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
- //}
+ if (!string.IsNullOrWhiteSpace(gatherCode))
+ {
+ sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ }
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync(sql);
@@ -186,30 +188,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync(sql);
- }
-
-
- ///
- /// 测试设备分组均衡控制算法
- ///
- ///
- ///
- [HttpGet]
- public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
- {
- var deviceList = new List();
- for (int i = 0; i < deviceCount; i++)
- {
- deviceList.Add($"Device_{Guid.NewGuid()}");
- }
-
- // 初始化缓存
- DeviceGroupBalanceControl.InitializeCache(deviceList);
-
- // 打印分布统计
- DeviceGroupBalanceControl.PrintDistributionStats();
-
- await Task.CompletedTask;
- }
+ }
}
}
\ No newline at end of file
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
index 3ac4202..2033035 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
@@ -24,13 +24,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳、或者某一个固定的标识1
///
[FIELDColumn]
- public string ScoreValue
- {
- get
- {
- return $"{DeviceId}.{TaskMark}".Md5Fun();
- }
- }
+ public string ScoreValue { get; set; }
///
/// 是否手动操作
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
index be97769..eac70a1 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
@@ -18,7 +18,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
///
[Column(IsIgnore = true)]
- public override string MemberId => $"{FocusId}:{MeterId}";
+ public override string MemberId => $"{FocusAddress}:{MeteringCode}";
///
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
@@ -90,6 +90,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
//// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5,燃气表流量计=6,特殊电表=7
///
public MeterTypeEnum MeterType { get; set; }
+
///
/// 设备品牌;
/// (当 MeterType = 水表, 如 威铭、捷先 等)
@@ -138,12 +139,17 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 采集器编号
///
public string GatherCode { get; set; }
-
+
///
/// 项目ID
///
public int ProjectID { get; set; }
+ ///
+ /// 数据库业务ID
+ ///
+ public int DatabaseBusiID { get; set; }
+
///
/// 是否异常集中器 0:正常,1异常
///
diff --git a/shared/JiShe.CollectBus.Common/Enums/BrandTypeEnum.cs b/shared/JiShe.CollectBus.Common/Enums/BrandTypeEnum.cs
new file mode 100644
index 0000000..83429ae
--- /dev/null
+++ b/shared/JiShe.CollectBus.Common/Enums/BrandTypeEnum.cs
@@ -0,0 +1,593 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Enums
+{
+ ///
+ /// 水表\流量计\特殊电表品牌
+ ///
+ public enum BrandTypeEnum
+ {
+ ///
+ /// 默认(OC_129)
+ ///
+ //None = 0,
+ ///
+ /// 冻结数据(0D_101)
+ ///
+ //Freeze = 1,
+ ///
+ /// 默认(OC_129)or 冻结数据(0D_101)
+ ///
+ NoneOrFreeze = 0,
+ ///
+ /// 188-良禾
+ ///
+ LiangHe188 = 1,
+ ///
+ /// 188-威铭
+ ///
+ WeiMing188 = 2,
+ ///
+ /// 188-宁波
+ ///
+ NingBo188 = 3,
+ ///
+ /// 西恩电磁流量计
+ ///
+ XEDC = 4,
+ ///
+ /// 西恩超声波流量计
+ ///
+ XECSB = 5,
+ ///
+ /// 电磁流量计(LDG-DN200)
+ ///
+ DCLDGDN200 = 6,
+ ///
+ /// 燃气表抄读
+ ///
+ Gasmeter = 7,
+ ///
+ /// 涡街流量计
+ ///
+ WJFlowmeter = 8,
+ ///
+ /// 流量计
+ ///
+ Flowmeter = 9,
+ ///
+ /// 涡街流量计(LUGBDN100)
+ ///
+ WJFlowmeterLUGBDN100 = 10,
+ ///
+ /// 涡街流量计(ZC-LUGB-232ZDNNY)
+ ///
+ WJFlowmeterZCLUGB232ZDNNY = 11,
+ ///
+ /// SB2100蒸汽表
+ ///
+ ZQBSB2100Flowmeter = 12,
+ ///
+ /// (HD)热式流量计
+ ///
+ RSHDFlowmeter = 13,
+ ///
+ /// (HDWMNDN300)热式流量计
+ ///
+ RSHDWMNDN300 = 14,
+ ///
+ /// 热式流量计(FLRS110-C100)
+ ///
+ RSFLRS110C100 = 15,
+ ///
+ /// 通用188协议
+ ///
+ Universal188 = 16,
+ #region 特殊电表
+ DTZ719 = 17,
+ AKKJMC800PY = 18,
+ HRKJ001 = 19,
+ THYB9D1 = 20,
+ DTSD342 = 21,
+ ///
+ /// 谐波
+ ///
+ AFN16_F109 = 22,
+ ///
+ /// DL/T 645—2007 规约时采用该类
+ ///
+ CustomItemCode_93 = 23,
+ ///
+ /// 电表组合有功示值透抄CustomItemCode_95
+ ///
+ AFN16_F95 = 24,
+ #endregion
+ ///
+ /// 非特殊表
+ ///
+ None = 25,
+ ///
+ /// SDM630MCT 导轨电表
+ ///
+ SDM630MCT = 26,
+ ///
+ /// 水表通过0C_129采集
+ ///
+ Watermeter0C_129 = 27,
+ ///
+ /// YYD电磁流量计
+ ///
+ YYDFlowmeter = 28,
+ ///
+ /// 透明转发 跳合闸(水表)
+ ///
+ AFN16_F99 = 29,
+ ///
+ /// 透明转发 跳合闸(气表)
+ ///
+ AFN16_F100 = 30,
+ ///
+ /// 温度压力补偿涡街流量计(TPC1001)涡街流量计
+ ///
+ WJTPC1001 = 31,
+ ///
+ /// (LDG-SP25)树普电磁流量计
+ ///
+ ShupuLDGSP25Flowmeter = 32,
+ ///
+ /// 西恩涡街流量计(LUGBC-100)
+ ///
+ XEWJLUGBC100 = 33,
+ ///
+ /// 智能涡街流量计(UG-1132A)
+ ///
+ WJUG1132A = 34,
+ ///
+ /// 水表通过0D_101采集
+ ///
+ Watermeter0D_101 = 35,
+ ///
+ /// 读取SIM卡号
+ ///
+ AFN16_F101 = 36,
+ ///
+ /// 恒瑞科技三相导轨式电能表 or 恒瑞科技嵌入式电表测试
+ ///
+ DTS600 = 37,
+ ///
+ /// 恒瑞科技单相导轨式电能表(DDS600)
+ ///
+ DDS600 = 38,
+ ///
+ /// 旋进漩涡流量计(LUXB) 天津凯隆仪表科技有限公司
+ ///
+ XJXWLUXB = 39,
+ ///
+ /// DDSD720-L科陆单相导轨表
+ ///
+ DDSD720L = 40,
+ ///
+ /// 东久电磁流量计DJLD
+ ///
+ DJLD = 41,
+ ///
+ /// DTSD720-L科陆三相导轨表
+ ///
+ DTSD720L = 42,
+ ///
+ /// 世成(社为表计)涡街流量计
+ ///
+ SCLUGB = 43,
+ ///
+ /// CL7339MN-ZY科陆三相表
+ ///
+ CL7339MNZY = 44,
+ ///
+ /// 江森智能SNY723MC数显表
+ ///
+ SNY723MC = 45,
+ ///
+ /// 珠海派诺科技PMAC770三相数显表
+ ///
+ PMAC770 = 46,
+ ///
+ /// 北京中科涡街流量计(ZKTD-LUCBY)
+ ///
+ ZKTD_LUGBY = 47,
+ ///
+ /// 夏仪股份蒸汽流量计(LUGB-DN)
+ ///
+ LUGB_DN = 48,
+ ///
+ /// LWQ-D2型气体涡轮流量计
+ ///
+ LWQ_D2 = 49,
+ ///
+ /// 西恩涡街流量计分体式(流量积算仪32FC系列)
+ ///
+ XEJSY32FC = 50,
+ ///
+ /// 寺崎科技PD652E-9S4电表
+ ///
+ PD652E9S4 = 51,
+ ///
+ /// 液体涡轮流量计(LWGY)
+ ///
+ LWGY = 52,
+ ///
+ /// 多功能积算仪(RW-A)
+ ///
+ DGNRWA = 53,
+ ///
+ /// 杭梅电气DTS804导轨表
+ ///
+ DTS804 = 54,
+ ///
+ /// 杭梅电气HG194-D93数显表
+ ///
+ HG194D93 = 55,
+ ///
+ /// 连水超声波水表188
+ ///
+ Lianshui188 = 56,
+ ///
+ /// 湖北回盛生物科技有限公司EZT96Y数显表
+ ///
+ EZT96Y,
+ ///
+ /// 上海肖月智能流量积算仪
+ ///
+ ZNLLJ,
+ ///
+ /// 西安诚通电磁流量计
+ ///
+ CTLDE250SC31GM8FB,
+ ///
+ /// 雅达YD2040
+ ///
+ YD2040,
+ ///
+ /// EVC智能体积修正仪
+ ///
+ EVC,
+ ///
+ /// 气体超声流量计IGSM-TS
+ ///
+ IGSMTS,
+ ///
+ /// YX-9SYE三相多功能表
+ ///
+ YX9SYE,
+ ///
+ /// 世成液体涡轮流量计(SCLWGY-DN50)
+ ///
+ SCLWGYDN50,
+ ///
+ /// 杭州盘古积算仪(FX6000F)
+ ///
+ FX6000F,
+ ///
+ /// "盘古电磁流量计(PMF-GM4.0A1-50M11K1F1T0C3)
+ ///
+ PFMGM40A150M11K1F1T0C3,
+ ///
+ /// 西恩液体涡轮流量计(SEAN LWGY-50)
+ ///
+ SeanLWGY50,
+ ///
+ /// 雷泰电磁流量计LD-LDE-DN50
+ ///
+ LDLDEDN50,
+ ///
+ /// 雷泰涡街流量计(LT-LUGB-DN50)
+ ///
+ LTLUGBDN50,
+ ///
+ /// 珠海派诺科技股份有限公司SPM33电力仪表
+ ///
+ SPM33,
+ ///
+ /// 株洲斯瑞克电气有限公司三相数显多功能电力仪表PD369E-AS4
+ ///
+ PD369EAS4,
+ ///
+ /// 湖北回盛生物科技有限公司-涡街流量计(10VTEAD03A200C1A2HOAG)
+ ///
+ WJ10VTEAD03A200C1A2HOAG,
+ ///
+ /// 世成旋进旋涡流量计SCLUX-DN25
+ ///
+ SCLUXDN25,
+ ///
+ /// 世成气体涡轮流量计(SCLWGQ-DN50)
+ ///
+ SCLWGQDN50,
+ ///
+ /// 智能电磁流量计(MDL210)
+ ///
+ MDL210,
+ ///
+ /// 江苏华海涡街流量计Focvor4202
+ ///
+ Focvor4202,
+ ///
+ /// 华凯电力HK194E-9S4
+ ///
+ HK194E9S4,
+ ///
+ /// 威胜测试-DTSD342_9N
+ ///
+ DTSD342Test,
+ ///
+ ///科迈捷涡街流量计VFM-60
+ ///
+ VFM60,
+ ///
+ ///江苏华海涡街流量计积算仪
+ ///
+ HHJSY,
+ ///
+ ///宏江4G水表
+ ///
+ HJDN15,
+ ///
+ ///世成4G涡街流量计
+ ///
+ LPV2,
+ ///
+ ///浙江正泰DTSU666
+ ///
+ DTSU666,
+ ///
+ /// 浙江启唯电气-数码三相多功能电表QV194E-9S4
+ ///
+ QV194E9S4,
+ ///
+ /// 施耐德PM2100
+ ///
+ PM2100,
+
+ ///
+ /// 天康电磁流量计
+ ///
+ TK1100FT,
+
+ ///
+ /// 西恩气体涡轮流量计(SEANLWQ)
+ ///
+ SEANLWQ,
+
+ ///
+ /// V880BR涡街流量计
+ ///
+ V880BR,
+
+ ///
+ /// 大导SDD194E-9
+ ///
+ SDD194E_9,
+
+ ///
+ ///泉高阀门科技有限公司-超声波水表
+ ///
+ QGFMCSB,
+
+ #region 传感器型号
+ SensorMeter,
+ #endregion
+
+ ///
+ /// 分体式超声波明渠流量计SC-6000F
+ ///
+ SC6000F,
+
+ ///
+ /// 江苏京仪JLWQ型气体流量计JLWQ型气体流量计
+ ///
+ JLWQ,
+
+ ///
+ /// 广州智光SMC200
+ ///
+ SMC200,
+
+ ///
+ /// 左拓ZFM2-621
+ ///
+ ZFM2621,
+
+ ///
+ /// 江苏华尔威涡街旋进流量计
+ ///
+ HRW520,
+
+ ///
+ /// 施耐德PM5350P
+ ///
+ PM5350P,
+
+ ///
+ /// 施耐德PM810MG
+ ///
+ PM810MG,
+
+ ///
+ /// 浙江之高ZL96-3E
+ ///
+ ZL96_3E,
+ ///
+ /// 拓普电子PD284Z-9S4
+ ///
+ PD284Z_9S4,
+ ///
+ /// 上海普川DTSU5886
+ ///
+ DTSU5886,
+ ///
+ /// 安德利SC194E-9S4
+ ///
+ SC194E9S4,
+ ///
+ /// 浙江天电电气TD700E-AS3
+ ///
+ TD700EAS3,
+ ///
+ /// 世成分体式涡街流量计SW-SCLUGB-DN
+ ///
+ SWSCLUGBDN,
+ ///
+ /// 东久电磁冷热量计SW-DJLD
+ ///
+ SWDJLD,
+
+ ///
+ /// 北京中科拓达ZKTD-LUGB
+ ///
+ ZKTD_LUGB,
+
+ ///
+ /// 江苏英美迪自动化有限公司三相液晶多功能仪表YMD96A-E4
+ ///
+ YMD96A_E4,
+
+ ///
+ /// 金湖盛元LWQ气体涡轮流量计
+ ///
+ JHSYLWQ,
+
+ ///
+ /// 天康涡街流量计TK2000
+ ///
+ TK2000,
+
+ ///
+ /// 浙江迈拓三相导轨电表DTSF1709
+ ///
+ DTSF1709,
+
+ ///
+ /// 杭州逸控科技超声波流量计ECUL30B-L2C1NSVC
+ ///
+ ECUL30BL2C1NSVC,
+ ///
+ /// 数字电测表HTD288-DM44/R
+ ///
+ HTD288,
+ ///
+ /// 杭州逸控科技有限公司ECLUGB2305W3C2N
+ ///
+ ECLUGB2305W3C2N,
+
+ ///
+ /// 江苏华海测控科技有限公司温压补偿流量积算仪
+ ///
+ XMJA9000,
+
+ ///
+ /// 湖南佳一机电设备有限公司精致型蒸汽热能积算仪
+ ///
+ F3200H,
+
+ ///
+ /// 合兴加能电梯能量回馈装置
+ ///
+ IPCPFE04MNDC,
+
+ ///
+ /// 宁波创盛旋涡流量计
+ ///
+ CX25,
+ ///
+ /// 群力电气QDCZY1N
+ ///
+ QDCZY1N,
+
+ ///
+ ///深圳中电PMCS963C
+ ///
+ PMCS963C,
+ ///
+ /// 迅尔燃气表D2SD2ET3D4S
+ ///
+ D2SD2ET3D4S,
+ ///
+ /// INTELLIGENT积算仪F2000X
+ ///
+ F2000X,
+
+ ///
+ ///多盟-DM194Z-9SY
+ ///
+ DM194Z9SY,
+ ///
+ /// 纳宇PD760
+ ///
+ PD760,
+ ///
+ /// 纳宇DTS90031LPD
+ ///
+ DTS90031LPD,
+
+ ///
+ /// 上海施易克SEIK680数显表
+ ///
+ SEIK680,
+
+ ///
+ /// 中灵电气ZL125SC
+ ///
+ ZL125SC,
+ ///
+ /// 江苏京仪气体涡轮流量计JLWQE
+ ///
+ JLWQE,
+ ///
+ /// HART智能转换器
+ ///
+ SM100,
+
+ ///
+ /// 拓思特涡街流量计H880BR
+ ///
+ H880BR,
+ ///
+ /// DDSD720-L-单相电子式导轨表
+ ///
+ DDSD720L2,
+
+ ///
+ /// 浙江智电三相三线大功率有功电能表
+ ///
+ ZJZDSXSX,
+
+ ///
+ /// 山水翔水表LXSY
+ ///
+ LXSY,
+
+ ///
+ /// 衡水多元仪表有限公司气体涡轮流量计DYWQ
+ ///
+ DYWQ,
+
+ ///
+ /// 安徽聚积电子
+ ///
+ DDS2052,
+
+ ///
+ /// 湖南中麦
+ ///
+ ZMDTSD3429N,
+
+ ///
+ ///DTS2377三相导轨式多功能智能电表
+ ///
+ DTS2377
+
+ }
+}
diff --git a/shared/JiShe.CollectBus.Common/Enums/MeterLinkProtocol.cs b/shared/JiShe.CollectBus.Common/Enums/MeterLinkProtocol.cs
new file mode 100644
index 0000000..6c06c5c
--- /dev/null
+++ b/shared/JiShe.CollectBus.Common/Enums/MeterLinkProtocol.cs
@@ -0,0 +1,54 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Enums
+{
+ ///
+ /// 表计连接通讯协议--表计与集中器的通讯协议
+ ///
+ public enum MeterLinkProtocol
+ {
+ ///
+ /// 无
+ ///
+ None = 0,
+
+ ///
+ /// DL/T 645—1997
+ ///
+ DLT_645_1997 = 1,
+
+ ///
+ /// 交流采样装置通信协议(电表)
+ ///
+ ACSamplingDevice = 2,
+
+ ///
+ /// DL/T 645—2007
+ ///
+ DLT_645_2007 = 30,
+
+ ///
+ /// 载波通信
+ ///
+ Carrierwave = 31,
+
+ ///
+ /// CJ/T 188—2018协议(水表)
+ ///
+ CJT_188_2018 = 32,
+
+ ///
+ /// CJ/T 188—2004协议
+ ///
+ CJT_188_2004 = 33,
+
+ ///
+ /// MODBUS-RTU
+ ///
+ MODBUS_RTU = 34,
+ }
+}
diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
index 88209c2..b25f9f0 100644
--- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
+++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
@@ -17,7 +17,7 @@
后端服务
-
+