diff --git a/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs b/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs
index 4c81d02..b2cf9e5 100644
--- a/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs
+++ b/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs
@@ -37,466 +37,6 @@ namespace JiShe.CollectBus.FreeRedis
Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type);
Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance;
- }
-
- /////
- ///// 单个添加数据
- /////
- /////
- ///// 主数据存储Hash缓存Key
- ///// 集中器索引Set缓存Key
- ///// 集中器排序索引ZSET缓存Key
- ///// 集中器采集频率分组全局索引ZSet缓存Key
- ///// 表计信息
- ///// 可选时间戳
- /////
- //public async Task AddMeterCacheData(
- //string redisCacheKey,
- //string redisCacheFocusIndexKey,
- //string redisCacheScoresIndexKey,
- //string redisCacheGlobalIndexKey,
- //T data,
- //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
- //{
- // // 参数校验增强
- // if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
- // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- // {
- // throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
- // }
-
- // // 计算组合score(分类ID + 时间戳)
- // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
-
- // long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
-
- // //全局索引写入
- // long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
-
- // // 使用事务保证原子性
- // using (var trans = Instance.Multi())
- // {
- // // 主数据存储Hash
- // trans.HSet(redisCacheKey, data.MemberID, data.Serialize());
-
- // // 分类索引
- // trans.SAdd(redisCacheFocusIndexKey, data.MemberID);
-
- // // 排序索引使用ZSET
- // trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID);
-
- // //全局索引
- // trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID);
-
- // var results = trans.Exec();
-
- // if (results == null || results.Length <= 0)
- // throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
- // }
-
- // await Task.CompletedTask;
- //}
-
- /////
- ///// 批量添加数据
- /////
- /////
- ///// 主数据存储Hash缓存Key
- ///// 集中器索引Set缓存Key
- ///// 集中器排序索引ZSET缓存Key
- ///// 集中器采集频率分组全局索引ZSet缓存Key
- ///// 数据集合
- ///// 可选时间戳
- /////
- //public async Task BatchAddMeterData(
- //string redisCacheKey,
- //string redisCacheFocusIndexKey,
- //string redisCacheScoresIndexKey,
- //string redisCacheGlobalIndexKey,
- //IEnumerable items,
- //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
- //{
- // if (items == null
- // || items.Count() <=0
- // || string.IsNullOrWhiteSpace(redisCacheKey)
- // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- // {
- // throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101");
- // }
-
- // const int BATCH_SIZE = 1000; // 每批1000条
- // var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
-
- // foreach (var batch in items.Batch(BATCH_SIZE))
- // {
- // await semaphore.WaitAsync();
-
- // _ = Task.Run(() =>
- // {
- // using (var pipe = Instance.StartPipe())
- // {
- // foreach (var item in batch)
- // {
- // // 计算组合score(分类ID + 时间戳)
- // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
-
- // long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
-
- // //全局索引写入
- // long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
-
- // // 主数据存储Hash
- // pipe.HSet(redisCacheKey, item.MemberID, item.Serialize());
-
- // // 分类索引Set
- // pipe.SAdd(redisCacheFocusIndexKey, item.MemberID);
-
- // // 排序索引使用ZSET
- // pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID);
-
- // //全局索引
- // pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID);
- // }
- // pipe.EndPipe();
- // }
- // semaphore.Release();
- // });
- // }
-
- // await Task.CompletedTask;
- //}
-
- /////
- ///// 删除指定redis缓存key的缓存数据
- /////
- /////
- ///// 主数据存储Hash缓存Key
- ///// 集中器索引Set缓存Key
- ///// 集中器排序索引ZSET缓存Key
- ///// 集中器采集频率分组全局索引ZSet缓存Key
- ///// 表计信息
- /////
- //public async Task RemoveMeterData(
- //string redisCacheKey,
- //string redisCacheFocusIndexKey,
- //string redisCacheScoresIndexKey,
- //string redisCacheGlobalIndexKey,
- //T data) where T : DeviceCacheBasicModel
- //{
-
- // if (data == null
- // || string.IsNullOrWhiteSpace(redisCacheKey)
- // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- // {
- // throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101");
- // }
-
- // const string luaScript = @"
- // local mainKey = KEYS[1]
- // local focusIndexKey = KEYS[2]
- // local scoresIndexKey = KEYS[3]
- // local globalIndexKey = KEYS[4]
- // local member = ARGV[1]
-
- // local deleted = 0
- // if redis.call('HDEL', mainKey, member) > 0 then
- // deleted = 1
- // end
-
- // redis.call('SREM', focusIndexKey, member)
- // redis.call('ZREM', scoresIndexKey, member)
- // redis.call('ZREM', globalIndexKey, member)
- // return deleted
- // ";
-
- // var keys = new[]
- // {
- // redisCacheKey,
- // redisCacheFocusIndexKey,
- // redisCacheScoresIndexKey,
- // redisCacheGlobalIndexKey
- // };
-
- // var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
-
- // if ((int)result == 0)
- // throw new KeyNotFoundException("指定数据不存在");
- //}
-
- /////
- ///// 修改表计缓存信息
- /////
- /////
- ///// 主数据存储Hash缓存Key
- ///// 旧集中器索引Set缓存Key
- ///// 新集中器索引Set缓存Key
- ///// 集中器排序索引ZSET缓存Key
- ///// 集中器采集频率分组全局索引ZSet缓存Key
- ///// 表计信息
- ///// 可选时间戳
- /////
- //public async Task UpdateMeterData(
- //string redisCacheKey,
- //string oldRedisCacheFocusIndexKey,
- //string newRedisCacheFocusIndexKey,
- //string redisCacheScoresIndexKey,
- //string redisCacheGlobalIndexKey,
- //T newData,
- //DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel
- //{
- // if (newData == null
- // || string.IsNullOrWhiteSpace(redisCacheKey)
- // || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey)
- // || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- // {
- // throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101");
- // }
-
- // var luaScript = @"
- // local mainKey = KEYS[1]
- // local oldFocusIndexKey = KEYS[2]
- // local newFocusIndexKey = KEYS[3]
- // local scoresIndexKey = KEYS[4]
- // local globalIndexKey = KEYS[5]
- // local member = ARGV[1]
- // local newData = ARGV[2]
- // local newScore = ARGV[3]
- // local newGlobalScore = ARGV[4]
-
- // -- 校验存在性
- // if redis.call('HEXISTS', mainKey, member) == 0 then
- // return 0
- // end
-
- // -- 更新主数据
- // redis.call('HSET', mainKey, member, newData)
-
- // -- 处理变更
- // if newScore ~= '' then
- // -- 删除旧索引
- // redis.call('SREM', oldFocusIndexKey, member)
- // redis.call('ZREM', scoresIndexKey, member)
-
- // -- 添加新索引
- // redis.call('SADD', newFocusIndexKey, member)
- // redis.call('ZADD', scoresIndexKey, newScore, member)
- // end
-
- // -- 更新全局索引
- // if newGlobalScore ~= '' then
- // -- 删除旧索引
- // redis.call('ZREM', globalIndexKey, member)
-
- // -- 添加新索引
- // redis.call('ZADD', globalIndexKey, newGlobalScore, member)
- // end
-
- // return 1
- // ";
-
- // var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
- // var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds();
- // var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks;
-
- // var result = await Instance.EvalAsync(luaScript,
- // new[]
- // {
- // redisCacheKey,
- // oldRedisCacheFocusIndexKey,
- // newRedisCacheFocusIndexKey,
- // redisCacheScoresIndexKey,
- // redisCacheGlobalIndexKey
- // },
- // new object[]
- // {
- // newData.MemberID,
- // newData.Serialize(),
- // newScoreValue.ToString() ?? "",
- // newGlobalScore.ToString() ?? ""
- // });
-
- // if ((int)result == 0)
- // {
- // throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在");
- // }
- //}
-
- //public async Task> SingleGetMeterPagedData(
- //string redisCacheKey,
- //string redisCacheScoresIndexKey,
- //int focusId,
- //int pageSize = 10,
- //int pageIndex = 1,
- //bool descending = true)
- //{
- // // 计算score范围
- // long minScore = (long)focusId << 32;
- // long maxScore = ((long)focusId + 1) << 32;
-
- // // 分页参数计算
- // int start = (pageIndex - 1) * pageSize;
-
- // // 获取排序后的member列表
- // var members = descending
- // ? await Instance.ZRevRangeByScoreAsync(
- // redisCacheScoresIndexKey,
- // maxScore,
- // minScore,
- // start,
- // pageSize)
- // : await Instance.ZRangeByScoreAsync(
- // redisCacheScoresIndexKey,
- // minScore,
- // maxScore,
- // start,
- // pageSize);
-
- // // 批量获取实际数据
- // var dataTasks = members.Select(m =>
- // Instance.HGetAsync(redisCacheKey, m)).ToArray();
- // await Task.WhenAll(dataTasks);
-
- // // 总数统计优化
- // var total = await Instance.ZCountAsync(
- // redisCacheScoresIndexKey,
- // minScore,
- // maxScore);
-
- // return new BusPagedResult
- // {
- // Items = dataTasks.Select(t => t.Result).ToList(),
- // TotalCount = total,
- // PageIndex = pageIndex,
- // PageSize = pageSize
- // };
- //}
-
-
- //public async Task> GetFocusPagedData(
- //string redisCacheKey,
- //string redisCacheScoresIndexKey,
- //int focusId,
- //int pageSize = 10,
- //long? lastScore = null,
- //string lastMember = null,
- //bool descending = true) where T : DeviceCacheBasicModel
- //{
- // // 计算分数范围
- // long minScore = (long)focusId << 32;
- // long maxScore = ((long)focusId + 1) << 32;
-
- // // 获取成员列表
- // var members = await GetSortedMembers(
- // redisCacheScoresIndexKey,
- // minScore,
- // maxScore,
- // pageSize,
- // lastScore,
- // lastMember,
- // descending);
-
- // // 批量获取数据
- // var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems);
-
- // return new BusPagedResult
- // {
- // Items = dataDict,
- // TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore),
- // HasNext = members.HasNext,
- // NextScore = members.NextScore,
- // NextMember = members.NextMember
- // };
- //}
-
- //private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)>
- // GetSortedMembers(
- // string zsetKey,
- // long minScore,
- // long maxScore,
- // int pageSize,
- // long? lastScore,
- // string lastMember,
- // bool descending)
- //{
- // var querySize = pageSize + 1;
- // var (startScore, exclude) = descending
- // ? (lastScore ?? maxScore, lastMember)
- // : (lastScore ?? minScore, lastMember);
-
- // var members = descending
- // ? await Instance.ZRevRangeByScoreAsync(
- // zsetKey,
- // max: startScore,
- // min: minScore,
- // offset: 0,
- // count: querySize)
- // : await Instance.ZRangeByScoreAsync(
- // zsetKey,
- // min: startScore,
- // max: maxScore,
- // offset: 0,
- // count: querySize);
-
- // var hasNext = members.Length > pageSize;
- // var currentItems = members.Take(pageSize).ToArray();
-
- // var nextCursor = currentItems.Any()
- // ? await GetNextCursor(zsetKey, currentItems.Last(), descending)
- // : (null, null);
-
- // return (currentItems, hasNext, nextCursor.score, nextCursor.member);
- //}
-
- //private async Task GetTotalCount(string zsetKey, long min, long max)
- //{
- // // 缓存计数优化
- // var cacheKey = $"{zsetKey}_count_{min}_{max}";
- // var cached = await Instance.GetAsync(cacheKey);
-
- // if (cached.HasValue)
- // return cached.Value;
-
- // var count = await Instance.ZCountAsync(zsetKey, min, max);
- // await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
- // return count;
- //}
-
-
- //public async Task>> BatchGetMeterPagedData(
- //string redisCacheKey,
- //string redisCacheScoresIndexKey,
- //IEnumerable focusIds,
- //int pageSizePerFocus = 10) where T : DeviceCacheBasicModel
- //{
- // var results = new ConcurrentDictionary>();
- // var parallelOptions = new ParallelOptions
- // {
- // MaxDegreeOfParallelism = Environment.ProcessorCount * 2
- // };
-
- // await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) =>
- // {
- // var data = await SingleGetMeterPagedData(
- // redisCacheKey,
- // redisCacheScoresIndexKey,
- // focusId,
- // pageSizePerFocus);
-
- // results.TryAdd(focusId, data);
- // });
-
- // return new Dictionary>(results);
- //}
-
-
-
+ }
}
}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
index 963d45f..791240d 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
@@ -132,9 +132,8 @@ namespace JiShe.CollectBus.IoTDB.Provider
public async Task BatchInsertAsync(DeviceMetadata deviceMetadata, IEnumerable entities) where T : IoTEntity
{
try
- {
-
- var batchSize = 2000;
+ {
+ var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
index 2183ca2..f4a8371 100644
--- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
@@ -72,7 +72,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
///
public async Task GetMeterInfoAsync(string meterType,string timeDensity="15")
{
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}";
+ var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}";
// TODO:临时写死,等确认后如何取再调整
return await Task.FromResult(new AmmeterInfo() {
ProjectID = 10000,
diff --git a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
index d870202..5ed2924 100644
--- a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
+++ b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
@@ -28,33 +28,16 @@ namespace JiShe.CollectBus.Application.Contracts
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel;
+
///
/// 批量添加数据
///
///
- /// 主数据存储Hash缓存Key
/// Set索引缓存Key
- /// ZSET索引缓存Key
- /// 设备缓存信息
+ /// hash缓存Key
/// 待缓存数据集合
///
Task BatchInsertDataAsync(
- string redisHashCacheKey,
- string redisSetIndexCacheKey,
- string redisZSetScoresIndexCacheKey,
- string redisDeviceInfoHashCacheKey,
- IEnumerable items) where T : DeviceCacheBasicModel;
-
-
- ///
- /// 批量添加数据
- ///
- ///
- /// Set索引缓存Key
- /// 设备缓存信息
- /// 待缓存数据集合
- ///
- Task BatchInsertDataAsync2(
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
Dictionary> items) where T : DeviceCacheBasicModel;
@@ -172,45 +155,24 @@ namespace JiShe.CollectBus.Application.Contracts
bool descending = true)
where T : DeviceCacheBasicModel;
-
- /////
- ///// 游标分页查询
- /////
- ///// 排序索引ZSET缓存Key
- ///// 分页数量
- ///// 开始索引
- ///// 开始唯一标识
- ///// 排序方式
- /////
- //Task<(List Members, bool HasNext)> GetPagedMembers(
- // string redisZSetScoresIndexCacheKey,
- // int pageSize,
- // decimal? startScore,
- // string excludeMember,
- // bool descending);
-
- /////
- ///// 批量获取指定分页的数据
- /////
- /////
- ///// Hash表缓存key
- ///// Hash表字段集合
- /////
- //Task> BatchGetData(
- // string redisHashCacheKey,
- // IEnumerable members)
- // where T : DeviceCacheBasicModel;
-
- /////
- ///// 获取下一页游标
- /////
- ///// 排序索引ZSET缓存Key
- ///// 最后一个唯一标识
- ///// 排序方式
- /////
- //Task GetNextScore(
- // string redisZSetScoresIndexCacheKey,
- // string lastMember,
- // bool descending);
+ ///
+ /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
+ ///
+ ///
+ /// ZSET索引缓存Key
+ /// 主数据存储Hash缓存Key
+ /// 分页尺寸
+ /// 最后一个索引
+ /// 最后一个唯一标识
+ /// 排序方式
+ ///
+ Task> GetAllPagedData2(
+ string redisCacheDeviceGroupSetIndexKey,
+ string redisCacheDeviceInfoHashKey,
+ int pageSize = 1000,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true)
+ where T : DeviceCacheBasicModel;
}
}
diff --git a/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs
index 66cbf2f..b82feab 100644
--- a/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs
@@ -4,6 +4,7 @@ using System.Threading.Tasks;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Ammeters;
+using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using Volo.Abp.Application.Services;
@@ -34,7 +35,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
/// 采集端Code
///
- Task> GetAmmeterInfoList(string gatherCode = "");
+ Task> GetAmmeterInfoList(string gatherCode = "");
///
/// 初始化电表缓存数据
@@ -82,7 +83,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
+ Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
///
/// 日冻结抄读
@@ -92,7 +93,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
+ Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
///
/// 月冻结数据抄读
@@ -102,7 +103,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
+ Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
#endregion
@@ -112,7 +113,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
/// 采集端Code
///
- Task> GetWatermeterInfoList(string gatherCode = "");
+ Task> GetWatermeterInfoList(string gatherCode = "");
///
/// 初始化水表缓存数据,只获取任务数据下发,不构建任务
@@ -139,7 +140,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
+ Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
///
/// 自动获取远程通信模块(SIM)版本信息
@@ -149,7 +150,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
+ Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
#endregion
diff --git a/services/JiShe.CollectBus.Application/CollectBusAppService.cs b/services/JiShe.CollectBus.Application/CollectBusAppService.cs
index 7c66b95..bd4a232 100644
--- a/services/JiShe.CollectBus.Application/CollectBusAppService.cs
+++ b/services/JiShe.CollectBus.Application/CollectBusAppService.cs
@@ -25,200 +25,5 @@ public abstract class CollectBusAppService : ApplicationService
{
LocalizationResource = typeof(CollectBusResource);
ObjectMapperContext = typeof(CollectBusApplicationModule);
- }
-
- ///
- /// Lua脚本批量获取缓存的表计信息
- ///
- /// 表信息数据对象
- /// 采集频率对应的缓存Key集合
- /// 系统类型
- /// 服务器标识
- /// 采集频率,1分钟、5分钟、15分钟
- /// 表计类型
- ///
- protected async Task>> GetMeterRedisCacheDictionaryData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
- {
- if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity))
- {
- throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101");
- }
-
- var meterInfos = new Dictionary>();
- var luaScript = @"
- local results = {}
- for i, key in ipairs(KEYS) do
- local data = redis.call('HGETALL', key)
- results[i] = {key, data}
- end
- return results";
-
- // 分页参数:每页处理10000个键
- int pageSize = 10000;
- int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
-
- for (int page = 0; page < totalPages; page++)
- {
- // 分页获取当前批次的键
- var batchKeys = redisKeys
- .Skip(page * pageSize)
- .Take(pageSize)
- .ToArray();
-
- // 执行Lua脚本获取当前批次数据
- var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
- if (merterResult == null)
- {
- throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102");
- }
-
- // 解析当前批次的结果
- if (merterResult is object[] arr)
- {
- foreach (object[] item in arr)
- {
- string key = (string)item[0];
- object[] fieldsAndValues = (object[])item[1];
- var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, systemType, serverTagName, meterType, timeDensity)}";
- string focusAddress = key.Replace(redisCacheKey, "");
-
- var meterHashs = new Dictionary();
- for (int i = 0; i < fieldsAndValues.Length; i += 2)
- {
- string meterId = (string)fieldsAndValues[i];
- string meterStr = (string)fieldsAndValues[i + 1];
-
- T meterInfo = default!;
- if (!string.IsNullOrWhiteSpace(meterStr))
- {
- meterInfo = meterStr.Deserialize()!;
- }
- if (meterInfo != null)
- {
- meterHashs[meterId] = meterInfo;
- }
- else
- {
- throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103");
- }
- }
-
- // 合并到总结果,若存在重复key则覆盖
- if (meterInfos.ContainsKey(focusAddress))
- {
- foreach (var kvp in meterHashs)
- {
- meterInfos[focusAddress][kvp.Key] = kvp.Value;
- }
- }
- else
- {
- meterInfos[focusAddress] = meterHashs;
- }
- }
- }
- else
- {
- throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104");
- }
- }
-
- return meterInfos;
- }
-
- ///
- /// Lua脚本批量获取缓存的表计信息
- ///
- /// 表信息数据对象
- /// 采集频率对应的缓存Key集合
- /// 系统类型
- /// 服务器标识
- /// 采集频率,1分钟、5分钟、15分钟
- /// 表计类型
- ///
- protected async Task> GetMeterRedisCacheListData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
- {
- if (redisKeys == null || redisKeys.Length <= 0 ||
- string.IsNullOrWhiteSpace(systemType) ||
- string.IsNullOrWhiteSpace(serverTagName) ||
- string.IsNullOrWhiteSpace(timeDensity))
- {
- throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101");
- }
-
- var meterInfos = new List();
- var luaScript = @"
- local results = {}
- for i, key in ipairs(KEYS) do
- local data = redis.call('HGETALL', key)
- results[i] = {key, data}
- end
- return results";
-
- // 分页参数:每页10000个键
- int pageSize = 10000;
- int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
-
- for (int page = 0; page < totalPages; page++)
- {
- // 分页获取当前批次键
- var batchKeys = redisKeys
- .Skip(page * pageSize)
- .Take(pageSize)
- .ToArray();
-
- // 执行Lua脚本获取当前页数据
- var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
- if (merterResult == null)
- {
- throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102");
- }
-
- // 解析当前页结果
- if (merterResult is object[] arr)
- {
- foreach (object[] item in arr)
- {
- string key = (string)item[0];
- object[] fieldsAndValues = (object[])item[1];
- var redisCacheKey = string.Format(
- RedisConst.CacheMeterInfoHashKey,
- systemType,
- serverTagName,
- meterType,
- timeDensity
- );
- string focusAddress = key.Replace(redisCacheKey, "");
-
- for (int i = 0; i < fieldsAndValues.Length; i += 2)
- {
- string meterId = (string)fieldsAndValues[i];
- string meterStr = (string)fieldsAndValues[i + 1];
-
- T meterInfo = default!;
- if (!string.IsNullOrWhiteSpace(meterStr))
- {
- meterInfo = meterStr.Deserialize()!;
- }
- if (meterInfo != null)
- {
- meterInfos.Add(meterInfo);
- }
- else
- {
- throw new Exception(
- $"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}),-103"
- );
- }
- }
- }
- }
- else
- {
- throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104");
- }
- }
-
- return meterInfos;
}
}
diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
index 1feb894..69c986a 100644
--- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
+++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
@@ -94,6 +94,6 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService();
- _= dbContext.InitAmmeterCacheData();
+ await dbContext.InitAmmeterCacheData();
}
}
\ No newline at end of file
diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs
index 5aec2e1..d73a174 100644
--- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs
+++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs
@@ -72,7 +72,7 @@ namespace JiShe.CollectBus.DataChannels
public async Task ScheduledMeterTaskReadingAsync(
ChannelReader>> telemetryPacketInfoReader)
{
- const int BatchSize = 100000;
+ const int BatchSize = 50000;
const int EmptyWaitMilliseconds = 50;
var timeout = TimeSpan.FromSeconds(5);
var timer = Stopwatch.StartNew();
@@ -89,7 +89,7 @@ namespace JiShe.CollectBus.DataChannels
{
if (timeoutMilliseconds > 0)
{
- _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
+ _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 任务数据通道处理数据耗时{timeoutMilliseconds}毫秒");
}
timeoutMilliseconds = 0;
//无消息时短等待50毫秒
diff --git a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
index d648c89..cc64378 100644
--- a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
+++ b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
@@ -94,73 +94,11 @@ namespace JiShe.CollectBus.RedisDataCache
/// 批量添加数据
///
///
- /// 主数据存储Hash缓存Key
/// Set索引缓存Key
- /// ZSET索引缓存Key
+ /// hash缓存Key
/// 待缓存数据集合
///
public async Task BatchInsertDataAsync(
- string redisHashCacheKey,
- string redisSetIndexCacheKey,
- string redisZSetScoresIndexCacheKey,
- string redisDeviceInfoHashCacheKey,
- IEnumerable items) where T : DeviceCacheBasicModel
- {
- if (items == null
- || items.Count() <= 0
- || string.IsNullOrWhiteSpace(redisHashCacheKey)
- || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
- || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)
- || string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
- )
- {
- _logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
- return;
- }
-
- const int BATCH_SIZE = 1000; // 每批1000条
- var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
-
- foreach (var batch in items.Batch(BATCH_SIZE))
- {
- await semaphore.WaitAsync();
-
- _ = Task.Run(() =>
- {
- using (var pipe = Instance.StartPipe())
- {
- foreach (var item in batch)
- {
- // 主数据存储Hash
- pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize());
-
- // Set索引缓存
- pipe.SAdd(redisSetIndexCacheKey, $"{item.TimeDensity.ToString().PadLeft(2, '0')}:{item.FocusAddress}");
-
- // ZSET索引缓存Key
- pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId);
-
- //设备信息缓存
- pipe.HSet(redisDeviceInfoHashCacheKey, item.FocusAddress, item.Serialize());
-
- }
- pipe.EndPipe();
- }
- semaphore.Release();
- });
- }
-
- await Task.CompletedTask;
- }
-
- ///
- /// 批量添加数据
- ///
- ///
- /// Set索引缓存Key
- /// 待缓存数据集合
- ///
- public async Task BatchInsertDataAsync2(
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
Dictionary> items) where T : DeviceCacheBasicModel
@@ -697,7 +635,167 @@ namespace JiShe.CollectBus.RedisDataCache
PageSize = pageSize,
};
}
-
+
+
+ ///
+ /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
+ ///
+ ///
+ /// ZSET索引缓存Key
+ /// 主数据存储Hash缓存Key
+ /// 分页尺寸
+ /// 最后一个索引
+ /// 最后一个唯一标识
+ /// 排序方式
+ ///
+ public async Task> GetAllPagedData2(
+ string redisCacheDeviceGroupSetIndexKey,
+ string redisCacheDeviceInfoHashKey,
+ int pageSize = 1000,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true)
+ where T : DeviceCacheBasicModel
+ {
+ // 参数校验增强
+ if (string.IsNullOrWhiteSpace(redisCacheDeviceInfoHashKey) ||
+ string.IsNullOrWhiteSpace(redisCacheDeviceGroupSetIndexKey))
+ {
+ _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
+ return new BusCacheGlobalPagedResult { Items = new List() };
+ }
+
+ pageSize = Math.Clamp(pageSize, 1, 10000);
+
+ var luaScript = @"
+ local command = ARGV[1]
+ local range_start = ARGV[2]
+ local range_end = ARGV[3]
+ local limit = tonumber(ARGV[4])
+ local last_score = ARGV[5]
+ local last_member = ARGV[6]
+
+ -- 获取原始数据
+ local members
+ if command == 'ZRANGEBYSCORE' then
+ members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
+ else
+ members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
+ end
+
+ -- 过滤数据
+ local filtered_members = {}
+ local count = 0
+ for i = 1, #members, 2 do
+ local member = members[i]
+ local score = members[i+1]
+ local include = true
+ if last_score ~= '' and last_member ~= '' then
+ if command == 'ZRANGEBYSCORE' then
+ -- 升序:score > last_score 或 (score == last_score 且 member > last_member)
+ if score == last_score then
+ include = member > last_member
+ else
+ include = tonumber(score) > tonumber(last_score)
+ end
+ else
+ -- 降序:score < last_score 或 (score == last_score 且 member < last_member)
+ if score == last_score then
+ include = member < last_member
+ else
+ include = tonumber(score) < tonumber(last_score)
+ end
+ end
+ end
+ if include then
+ table.insert(filtered_members, member)
+ table.insert(filtered_members, score)
+ count = count + 1
+ if count >= limit then
+ break
+ end
+ end
+ end
+
+ -- 提取有效数据
+ local result_members, result_scores = {}, {}
+ for i=1,#filtered_members,2 do
+ table.insert(result_members, filtered_members[i])
+ table.insert(result_scores, filtered_members[i+1])
+ end
+
+ if #result_members == 0 then return {0,{},{},{}} end
+
+ -- 获取Hash数据
+ local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
+ return {#result_members, result_members, result_scores, hash_data}";
+
+ // 调整范围构造逻辑(移除排他符号)
+ string rangeStart, rangeEnd;
+ if (descending)
+ {
+ rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
+ rangeEnd = "-inf";
+ }
+ else
+ {
+ rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
+ rangeEnd = "+inf";
+ }
+
+ var scriptResult = (object[])await Instance.EvalAsync(luaScript,
+ new[] { redisCacheDeviceGroupSetIndexKey, redisCacheDeviceInfoHashKey },
+ new object[]
+ {
+ descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
+ rangeStart,
+ rangeEnd,
+ (pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页
+ lastScore?.ToString() ?? "",
+ lastMember ?? ""
+ });
+
+ if ((long)scriptResult[0] == 0)
+ return new BusCacheGlobalPagedResult { Items = new List() };
+
+ // 处理结果集
+ var members = ((object[])scriptResult[1]).Cast().ToList();
+ var scores = ((object[])scriptResult[2]).Cast().Select(decimal.Parse).ToList();
+ var hashData = ((object[])scriptResult[3]).Cast().ToList();
+
+ var validItems = members.AsParallel()
+ .Select((m, i) =>
+ {
+ try { return BusJsonSerializer.Deserialize(hashData[i]); }
+ catch { return null; }
+ })
+ .Where(x => x != null)
+ .ToList();
+
+ var hasNext = validItems.Count > pageSize;
+ var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
+
+ //分页锚点索引
+ decimal? nextScore = null;
+ string nextMember = null;
+ if (hasNext && actualItems.Any())
+ {
+ var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引
+ nextScore = scores[lastIndex];
+ nextMember = members[lastIndex];
+ }
+
+ return new BusCacheGlobalPagedResult
+ {
+ Items = actualItems.ToList(),
+ HasNext = hasNext,
+ NextScore = nextScore,
+ NextMember = nextMember,
+ TotalCount = await GetTotalCount(redisCacheDeviceGroupSetIndexKey),
+ PageSize = pageSize,
+ };
+ }
+
///
/// 游标分页查询
diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index e4c61d6..8d9ac21 100644
--- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -350,22 +350,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
//// 初始化缓存
//DeviceGroupBalanceControl.InitializeCache(deviceList);
- var timeDensity = "15";
- //获取缓存中的电表信息
- var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
+ //var timeDensity = "15";
+ ////获取缓存中的电表信息
+ //var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
- var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
- List focusAddressDataLista = new List();
- foreach (var item in meterInfos)
- {
- focusAddressDataLista.Add(item.FocusAddress);
- }
+ //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
+ //var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
+ //List focusAddressDataLista = new List();
+ //foreach (var item in meterInfos)
+ //{
+ // focusAddressDataLista.Add(item.FocusAddress);
+ //}
- DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
+ //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
- // 打印分布统计
- DeviceGroupBalanceControl.PrintDistributionStats();
+ //// 打印分布统计
+ //DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
@@ -392,46 +392,46 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet]
public async Task TestRedisCacheGetAllPagedData()
{
- var timeDensity = "15";
- string SystemType = "Energy";
- string ServerTagName = "JiSheCollectBus2";
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ //var timeDensity = "15";
+ //string SystemType = "Energy";
+ //string ServerTagName = "JiSheCollectBus2";
+ //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var timer1 = Stopwatch.StartNew();
- decimal? cursor = null;
- string member = null;
- bool hasNext;
- List meterInfos = new List();
- do
- {
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- pageSize: 1000,
- lastScore: cursor,
- lastMember: member);
+ //var timer1 = Stopwatch.StartNew();
+ //decimal? cursor = null;
+ //string member = null;
+ //bool hasNext;
+ //List meterInfos = new List();
+ //do
+ //{
+ // var page = await _redisDataCacheService.GetAllPagedData(
+ // redisCacheMeterInfoHashKeyTemp,
+ // redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ // pageSize: 1000,
+ // lastScore: cursor,
+ // lastMember: member);
- meterInfos.AddRange(page.Items);
- cursor = page.HasNext ? page.NextScore : null;
- member = page.HasNext ? page.NextMember : null;
- hasNext = page.HasNext;
- } while (hasNext);
+ // meterInfos.AddRange(page.Items);
+ // cursor = page.HasNext ? page.NextScore : null;
+ // member = page.HasNext ? page.NextMember : null;
+ // hasNext = page.HasNext;
+ //} while (hasNext);
- timer1.Stop();
- _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
+ //timer1.Stop();
+ //_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
- List focusAddressDataLista = new List();
- foreach (var item in meterInfos)
- {
- focusAddressDataLista.Add(item.FocusAddress);
- }
+ //List focusAddressDataLista = new List();
+ //foreach (var item in meterInfos)
+ //{
+ // focusAddressDataLista.Add(item.FocusAddress);
+ //}
- DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
+ //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
- // 打印分布统计
- DeviceGroupBalanceControl.PrintDistributionStats();
+ //// 打印分布统计
+ //DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
@@ -541,22 +541,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet]
public async Task TestRedisCacheGetData(string scores)
{
- var timeDensity = "15";
- string SystemType = "Energy";
- string ServerTagName = "JiSheCollectBus5";
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ //var timeDensity = "15";
+ //string SystemType = "Energy";
+ //string ServerTagName = "JiSheCollectBus5";
+ //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var page = await _redisDataCacheService.GetSingleData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- "973219481:17",
- pageSize: 1000,
- lastScore: 100,
- lastMember: "memberId",
- descending: true
- );
+ //var page = await _redisDataCacheService.GetSingleData(
+ // redisCacheMeterInfoHashKeyTemp,
+ // redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ // "973219481:17",
+ // pageSize: 1000,
+ // lastScore: 100,
+ // lastMember: "memberId",
+ // descending: true
+ // );
await Task.CompletedTask;
}
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 8e02627..2991fe8 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -15,11 +15,12 @@ using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
+using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
-using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Models;
+using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
@@ -140,7 +141,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
//return;
- _ = CreateMeterPublishTask(
+ _ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
@@ -158,7 +159,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
{
- _ = CreateMeterPublishTask(
+ _ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
@@ -175,7 +176,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
{
- _ = CreateMeterPublishTask(
+ _ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
@@ -192,7 +193,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
{
- _ = CreateMeterPublishTask(
+ _ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
@@ -209,7 +210,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
{
- _ = CreateMeterPublishTask(
+ _ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
@@ -243,7 +244,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
- _ = CreateMeterPublishTask(
+ _ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
@@ -261,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
- _ = CreateMeterPublishTask(
+ _ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.WaterMeter,
@@ -310,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
/// 采集端Code
///
- public virtual Task> GetAmmeterInfoList(string gatherCode = "")
+ public virtual Task> GetAmmeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
@@ -322,201 +323,175 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
- //return;
+ //return;
- // 创建取消令牌源
- //var cts = new CancellationTokenSource();
-
- _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader );
-
- //此处代码不要删除
-#if DEBUG
- var timeDensity = "15";
- var serverTagName = "JiSheCollectBus2";
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
-
- List meterInfos = new List();
- List focusAddressDataLista = new List();
- var timer1 = Stopwatch.StartNew();
-
- var allIds = new HashSet();
- decimal? score = null;
- string member = null;
-
- while (true)
+ try
{
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- pageSize: 1000,
- lastScore: score,
- lastMember: member);
+ // 创建取消令牌源
+ //var cts = new CancellationTokenSource();
- meterInfos.AddRange(page.Items);
- focusAddressDataLista.AddRange(page.Items.Select(d => $"{d.MeterId}"));
- foreach (var item in page.Items)
+ _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
+
+ //此处代码不要删除
+#if DEBUG
+ var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus2:DeviceInfo";
+
+ var timer1 = Stopwatch.StartNew();
+ Dictionary> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll>(redisCacheDeviceInfoHashKeyTemp);
+ List meterInfos = new List();
+ List focusAddressDataLista = new List();
+ foreach (var item in keyValuePairsTemps)
{
- if (!allIds.Add(item.MemberId))
+ foreach (var subItem in item.Value)
{
- _logger.LogError($"{item.MemberId}Duplicate data found!");
+ if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
+ {
+ meterInfos.Add(subItem);
+ focusAddressDataLista.Add(subItem.MeterId.ToString());
+ }
}
}
- if (!page.HasNext) break;
- score = page.NextScore;
- member = page.NextMember;
- }
- timer1.Stop();
- _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
- //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
- //return;
+ timer1.Stop();
+ _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
+ DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
+ return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
-#endif
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
- }
-
- //获取采集项类型数据
- var gatherItemInfos = await GetGatherItemByDataTypes();
- if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
- {
- throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
- }
- var timer = Stopwatch.StartNew();
-
- List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。
-
- //根据采集频率分组,获得采集频率分组
- var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
- var currentTaskTime = DateTime.Now;
- if (_applicationOptions.FirstCollectionTime.HasValue == false)
- {
- _applicationOptions.FirstCollectionTime = currentTaskTime;
- }
- //先处理采集频率任务缓存
- foreach (var item in meterInfoGroupByTimeDensity)
- {
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
+#endif
+ if (meterInfos == null || meterInfos.Count <= 0)
{
- LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
- TimeDensity = item.Key,
- };
- nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
- //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
+ _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
+ return;
+ }
- var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
- await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
- }
- string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
- Dictionary> keyValuePairs = new Dictionary>();
+ _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,读取数据成功");
- foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
- {
- var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- List ammeterInfos = new List();
- //将表计信息根据集中器分组,获得集中器号
- var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
- foreach (var item in meterInfoGroup)
+ //获取采集项类型数据
+ var gatherItemInfos = await GetGatherItemByDataTypes();
+ if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
- if (string.IsNullOrWhiteSpace(item.Key))//集中器号为空,跳过
- {
- continue;
- }
+ _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
+ return;
+ }
+ var timer = Stopwatch.StartNew();
- foreach (var ammeter in item)
- {
- deviceIds.Add(ammeter.MeterId.ToString());
+ List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。
- //处理ItemCode
- if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
+ //根据采集频率分组,获得采集频率分组
+ var meterInfoGroupByTimeDensity = meterInfos.Select(d => d.TimeDensity).GroupBy(d => d);
+ var currentTaskTime = DateTime.Now;
+ if (_applicationOptions.FirstCollectionTime.HasValue == false)
+ {
+ _applicationOptions.FirstCollectionTime = currentTaskTime;
+ }
+ //先处理采集频率任务缓存
+ foreach (var item in meterInfoGroupByTimeDensity)
+ {
+ TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
+ {
+ LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
+ TimeDensity = item.Key,
+ };
+ nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
+ //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
+
+ var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
+ await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
+ }
+
+ //设备hash缓存key
+ string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
+
+ //设备分组集合key
+ string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
+
+ Dictionary> keyValuePairs = new Dictionary>();
+
+ //处理设备缓存信息
+ foreach (var ammeter in meterInfos)
+ {
+ deviceIds.Add(ammeter.MeterId.ToString());
+
+ //处理ItemCode
+ if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
+ {
+ var itemArr = ammeter.DataTypes.Split(',').ToList();
+
+ #region 拼接采集项
+ List itemCodeList = new List();
+ foreach (var dataType in itemArr)
{
- var itemArr = ammeter.DataTypes.Split(',').ToList();
-
- #region 拼接采集项
- List itemCodeList = new List();
- foreach (var dataType in itemArr)
+ var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
+ var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
+ if (gatherItem != null)
{
- var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
- var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
- if (gatherItem != null)
+ if (!excludeItemCode.Contains(gatherItem.ItemCode))
{
- if (!excludeItemCode.Contains(gatherItem.ItemCode))
- {
- itemCodeList.Add(gatherItem.ItemCode);
- }
+ itemCodeList.Add(gatherItem.ItemCode);
}
+ }
- #region 特殊电表采集项编号处理
- if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
- {
- itemCodeList.Add("10_95");
- }
- if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
- {
- itemCodeList.Add("10_109");
- }
- #endregion
+ #region 特殊电表采集项编号处理
+ if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
+ {
+ itemCodeList.Add("10_95");
+ }
+ if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
+ {
+ itemCodeList.Add("10_109");
}
#endregion
-
-
-
- ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
-
- if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
- {
- ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
- }
}
+ #endregion
- ammeterInfos.Add(ammeter);
- if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
+
+ ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
+
+ if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
- keyValuePairs[ammeter.FocusAddress] = new List() {ammeter };
- }
- else
- {
- keyValuePairs[ammeter.FocusAddress].Add(ammeter);
+ ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
}
+
+ if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
+ {
+ keyValuePairs[ammeter.FocusAddress] = new List() { ammeter.Adapt() };
+ }
+ else
+ {
+ keyValuePairs[ammeter.FocusAddress].Add(ammeter.Adapt());
+ }
}
- //await _redisDataCacheService.BatchInsertDataAsync2(
- // redisCacheMeterInfoSetIndexKey,
- // redisDeviceInfoHashCacheKey,
- // keyValuePairs);
+ await _redisDataCacheService.BatchInsertDataAsync(
+ redisCacheDeviceGroupSetIndexKey,
+ redisCacheDeviceInfoHashKey,
+ keyValuePairs);
- await _redisDataCacheService.BatchInsertDataAsync(
- redisCacheMeterInfoHashKey,
- redisCacheMeterInfoSetIndexKey,
- redisCacheMeterInfoZSetScoresIndexKey,
- redisDeviceInfoHashCacheKey,
- ammeterInfos);
+ //初始化设备组负载控制
+ if (deviceIds == null || deviceIds.Count <= 0)
+ {
+ _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
+
+ }
+ else
+ {
+ DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
+ }
+
+ timer.Stop();
+
+ _logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
}
-
- //初始化设备组负载控制
- if (deviceIds == null || deviceIds.Count <= 0)
+ catch (Exception ex)
{
- _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
-
+ _logger.LogError(ex, $"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据异常");
+ throw ex;
}
- else
- {
- DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
- }
-
- timer.Stop();
-
- _logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
}
///
@@ -636,7 +611,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- private async Task> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ private async Task> AmmerterCreatePublishTaskAction(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
@@ -808,7 +783,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- public virtual async Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ public virtual async Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
@@ -842,7 +817,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
- MeterAddress = ammeterInfo.AmmerterAddress,
+ MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password,
ItemCode = subItemCode,
}
@@ -884,7 +859,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- public virtual async Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ public virtual async Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
@@ -954,7 +929,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- public virtual async Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ public virtual async Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
@@ -1093,7 +1068,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
/// 采集端Code
///
- public virtual Task> GetWatermeterInfoList(string gatherCode = "")
+ public virtual Task> GetWatermeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
}
@@ -1121,7 +1096,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
- var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
+ var meterInfoGroupByTimeDensity = meterInfos.Select(d=>d.TimeDensity).GroupBy(d => d);
var currentTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
@@ -1144,40 +1119,35 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
- string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
- foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
+
+ //设备hash缓存key
+ string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
+
+ //设备分组集合key
+ string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
+
+ Dictionary> keyValuePairs = new Dictionary>();
+ foreach (var subItem in meterInfos)
{
- List watermeterInfo = new List();
+ deviceIds.Add(subItem.MeterId.ToString());
- //将表计信息根据集中器分组,获得集中器号
- var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
- foreach (var item in meterInfoGroup)
+ if (!keyValuePairs.ContainsKey(subItem.FocusAddress))
{
- if (string.IsNullOrWhiteSpace(item.Key))
- {
- continue;
- }
-
-
- var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
-
- foreach (var subItem in item)
- {
- deviceIds.Add(subItem.MeterId.ToString());
-
- watermeterInfo.Add(subItem);
- }
-
- await _redisDataCacheService.BatchInsertDataAsync(
- redisCacheMeterInfoHashKey,
- redisCacheMeterInfoSetIndexKey,
- redisDeviceInfoHashCacheKey,
- redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
+ keyValuePairs[subItem.FocusAddress] = new List() { subItem };
+ }
+ else
+ {
+ keyValuePairs[subItem.FocusAddress].Add(subItem);
}
}
+
+
+ await _redisDataCacheService.BatchInsertDataAsync(
+ redisCacheDeviceGroupSetIndexKey,
+ redisCacheDeviceInfoHashKey,
+ keyValuePairs);
+
//初始化设备组负载控制
if (deviceIds == null || deviceIds.Count <= 0)
{
@@ -1241,7 +1211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 时间格式的任务批次名称
///
private async Task> WatermeterCreatePublishTaskAction(int timeDensity
- , WatermeterInfo watermeter, int groupIndex, DateTime timestamps)
+ , DeviceInfo watermeter, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
@@ -1270,7 +1240,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据表型号获取协议插件
- var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code);
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.BrandType);
if (protocolPlugin == null)
{
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
@@ -1351,7 +1321,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- public virtual async Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ public virtual async Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
@@ -1392,14 +1362,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
});
var meterReadingRecords = CreateAmmeterPacketInfo(
- ammeterInfo: ammeterInfo,
- timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
- builderResponse: builderResponse,
- itemCode: itemCode,
- subItemCode: null,
- pendingCopyReadTime: currentTime,
- creationTime: currentTime,
- packetType: TelemetryPacketTypeEnum.TerminalVersion);
+ ammeterInfo: ammeterInfo,
+ timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
+ builderResponse: builderResponse,
+ itemCode: itemCode,
+ subItemCode: null,
+ pendingCopyReadTime: currentTime,
+ creationTime: currentTime,
+ packetType: TelemetryPacketTypeEnum.TerminalVersion);
+
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
@@ -1425,7 +1396,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- public virtual async Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ public virtual async Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
@@ -1459,14 +1430,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
});
var meterReadingRecords = CreateAmmeterPacketInfo(
- ammeterInfo: ammeterInfo,
- timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
- builderResponse: builderResponse,
- itemCode: itemCode,
- subItemCode: null,
- pendingCopyReadTime: currentTime,
- creationTime: currentTime,
- packetType: TelemetryPacketTypeEnum.TelematicsModule);
+ ammeterInfo: ammeterInfo,
+ timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
+ builderResponse: builderResponse,
+ itemCode: itemCode,
+ subItemCode: null,
+ pendingCopyReadTime: currentTime,
+ creationTime: currentTime,
+ packetType: TelemetryPacketTypeEnum.TelematicsModule);
+
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
@@ -1517,32 +1489,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}";
- var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
- var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
+ //设备hash缓存key
+ string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
- List meterInfos = new List();
- decimal? cursor = null;
- string member = null;
+ //设备分组集合key
+ string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
- while (true)
- {
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- pageSize: 1000,
- lastScore: cursor,
- lastMember: member);
+ //List meterInfos = new List();
+ //decimal? cursor = null;
+ //string member = null;
- meterInfos.AddRange(page.Items);
- if (!page.HasNext)
- {
- break;
- }
+ //while (true)
+ //{
+ // var page = await _redisDataCacheService.GetAllPagedData2(
+ // redisCacheDeviceGroupSetIndexKey,
+ // redisCacheDeviceInfoHashKey,
+ // pageSize: 1000,
+ // lastScore: cursor,
+ // lastMember: member);
- cursor = page.NextScore;
- member = page.NextMember;
- }
+ // meterInfos.AddRange(page.Items);
+ // if (!page.HasNext)
+ // {
+ // break;
+ // }
+
+ // cursor = page.NextScore;
+ // member = page.NextMember;
+ //}
//var page = await _redisDataCacheService.GetAllPagedData(
// redisCacheMeterInfoHashKeyTemp,
@@ -1552,16 +1526,32 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// lastMember: member);
//meterInfos.AddRange(page.Items);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- timer.Stop();
- _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
- return;
- }
+ //if (meterInfos == null || meterInfos.Count <= 0)
+ //{
+ // timer.Stop();
+ // _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
+ // return;
+ //}
+
+ Dictionary> keyValuePairs = FreeRedisProvider.Instance.HGetAll>(redisCacheDeviceInfoHashKey);
+
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
+
+ List meterInfos = new List();
+ foreach (var item in keyValuePairs)
+ {
+ foreach (var subItem in item.Value)
+ {
+ if (subItem.MeterType == meterType && subItem.TimeDensity == timeDensity)
+ {
+ meterInfos.Add(subItem);
+ }
+ }
+ }
+
timer.Restart();
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
@@ -1652,7 +1642,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 数据创建时间戳
/// 数据包类型
///
- protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType)
+ protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType)
{
try
{
@@ -1667,7 +1657,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = creationTime,
- MeterAddress = ammeterInfo.AmmerterAddress,
+ MeterAddress = ammeterInfo.MeterAddress,
PacketType = (int)packetType,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index d652722..57eb3ec 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -12,6 +12,7 @@ using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.Ammeters;
+using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
@@ -39,7 +40,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string systemType = string.Empty;
string serverTagName = string.Empty;
private readonly ILogger _logger;
- private readonly IIoTDbProvider _dbProvider;
private readonly IProtocolService _protocolService;
public EnergySystemScheduledMeterReadingService(
@@ -62,7 +62,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
serverTagName = applicationOptions.Value.ServerTagName;
systemType = applicationOptions.Value.SystemType;
- _dbProvider = dbProvider;
_logger = logger;
_protocolService = protocolService;
}
@@ -97,59 +96,93 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
//[HttpGet]
//[Route($"ammeter/list")]
- public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
+ public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
- List ammeterInfos = new List();
- ammeterInfos.Add(new AmmeterInfo()
+ //List ammeterInfos = new List();
+ //ammeterInfos.Add(new DeviceInfo()
+ //{
+ // Baudrate = 2400,
+ // FocusAddress = "402440506",
+ // Name = "张家祠工务(三相电表)",
+ // FocusId = 95780,
+ // DatabaseBusiID = 1,
+ // MeteringCode = 1,
+ // MeterAddress = "402410040506",
+ // MeterId = 127035,
+ // TypeName = 3,
+ // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
+ // TimeDensity = 15,
+ // BrandType = "DDS1980",
+ //});
+
+ //ammeterInfos.Add(new DeviceInfo()
+ //{
+ // Baudrate = 2400,
+ // FocusAddress = "542400504",
+ // Name = "五号配(长芦二所四排)(单相电表)",
+ // FocusId = 69280,
+ // DatabaseBusiID = 1,
+ // MeteringCode = 2,
+ // MeterAddress = "542410000504",
+ // MeterId = 95594,
+ // TypeName = 1,
+ // DataTypes = "581,589,592,597,601",
+ // TimeDensity = 15,
+ // BrandType = "DDS1980",
+ //});
+
+ //return ammeterInfos;
+
+ try
{
- Baudrate = 2400,
- FocusAddress = "402440506",
- Name = "张家祠工务(三相电表)",
- FocusId = 95780,
- DatabaseBusiID = 1,
- MeteringCode = 1,
- AmmerterAddress = "402410040506",
- MeterId = 127035,
- TypeName = 3,
- DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
- TimeDensity = 15,
- BrandType = "DDS1980",
- });
+ string sql = $@"SELECT
+ C.ID as MeterId,
+ C.Name,
+ C.FocusID as FocusId,
+ C.SingleRate,
+ C.MeteringCode,
+ C.Code AS BrandType,
+ C.Baudrate,
+ C.Password,
+ C.MeteringPort,
+ C.[Address] AS MeterAddress,
+ C.TypeName,
+ C.Protocol,
+ C.TripState,
+ C.[State],
+ B.[Address],
+ B.AreaCode,
+ B.AutomaticReport,
+ D.DataTypes,
+ B.TimeDensity,
+ A.GatherCode,
+ C.Special,
+ C.[ProjectID],
+ B.AbnormalState,
+ B.LastTime,
+ 1 as MeterType,
+ CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
+ (select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
+ FROM TB_GatherInfo(NOLOCK) AS A
+ INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
+ INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
+ INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
+ WHERE 1=1 and C.Special = 0 ";
+ //TODO 记得移除特殊表过滤
- ammeterInfos.Add(new AmmeterInfo()
- {
- Baudrate = 2400,
- FocusAddress = "542400504",
- Name = "五号配(长芦二所四排)(单相电表)",
- FocusId = 69280,
- DatabaseBusiID = 1,
- MeteringCode = 2,
- AmmerterAddress = "542410000504",
- MeterId = 95594,
- TypeName = 1,
- DataTypes = "581,589,592,597,601",
- TimeDensity = 15,
- BrandType = "DDS1980",
- });
-
- return ammeterInfos;
-
- string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
- FROM TB_GatherInfo(NOLOCK) AS A
- INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
- INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
- INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
- WHERE 1=1 and C.Special = 0 ";
- //TODO 记得移除特殊表过滤
-
- if (!string.IsNullOrWhiteSpace(gatherCode))
- {
- sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ if (!string.IsNullOrWhiteSpace(gatherCode))
+ {
+ sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ }
+ return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
+ .Ado
+ .QueryAsync(sql);
+ }
+ catch (Exception ex)
+ {
+ throw ex;
}
- return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
- .Ado
- .QueryAsync(sql);
}
@@ -160,7 +193,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public override async Task> GetAmmeterAutoValveControlSetting(string currentTime)
{
- string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
+ try
+ {
+ string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
FROM TB_AutoTripTask(nolock) AS A
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
@@ -168,14 +203,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' ";
- if (!string.IsNullOrWhiteSpace(currentTime))
- {
- sql = $@"{sql} AND A.TripTime = '{currentTime}'";
- }
+ if (!string.IsNullOrWhiteSpace(currentTime))
+ {
+ sql = $@"{sql} AND A.TripTime = '{currentTime}'";
+ }
- return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
- .Ado
- .QueryAsync(sql);
+ return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
+ .Ado
+ .QueryAsync(sql);
+ }
+ catch (Exception)
+ {
+
+ throw;
+ }
}
@@ -200,7 +241,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//批量获取对应的缓存电表信息
- var ammeterInfos = new List();
+ var ammeterInfos = new List();
List taskList = new List();
foreach (var settingInfo in settingInfos)
@@ -270,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
- MeterAddress = ammeterInfo.AmmerterAddress,
+ MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password,
ItemCode = subItemCode,
}
@@ -311,9 +352,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
//[HttpGet]
//[Route($"ammeter/list")]
- public override async Task> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
+ public override async Task> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
{
- string sql = $@"SELECT
+ try
+ {
+ string sql = $@"SELECT
A.ID as MeterId,
A.Name,
A.FocusID as FocusId,
@@ -340,6 +383,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
A.[ProjectID],
B.AbnormalState,
B.LastTime,
+ 2 as MeterType,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID
FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A
@@ -347,13 +391,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID
WHERE A.State>=0 AND A.State<100 ";
- if (!string.IsNullOrWhiteSpace(gatherCode))
- {
- sql = $@"{sql} AND C.GatherCode= '{gatherCode}'";
+ if (!string.IsNullOrWhiteSpace(gatherCode))
+ {
+ sql = $@"{sql} AND C.GatherCode= '{gatherCode}'";
+ }
+ return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
+ .Ado
+ .QueryAsync(sql);
+ }
+ catch (Exception)
+ {
+
+ throw;
}
- return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
- .Ado
- .QueryAsync(sql);
}
}
}
\ No newline at end of file
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceCacheInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceInfo.cs
similarity index 80%
rename from services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceCacheInfo.cs
rename to services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceInfo.cs
index 26bc6aa..61fe6f5 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceCacheInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceInfo.cs
@@ -13,9 +13,9 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.IotSystems.Devices
{
///
- /// 设备缓存信息
+ /// 设备信息
///
- public class DeviceCacheInfo : DeviceCacheBasicModel
+ public class DeviceInfo : DeviceCacheBasicModel
{
///
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
@@ -28,12 +28,36 @@ namespace JiShe.CollectBus.IotSystems.Devices
///
[Column(IsIgnore = true)]
public override long ScoreValue => Common.Helpers.CommonHelper.GetFocusScores(FocusAddress, MeteringCode);
+
+ ///
+ /// 通讯方案:
+ /// NB-IOT常德水表、NB-IOT泽联电表、GPRS华立水表、
+ /// RS-485、无线、载波
+ ///
+ public string LinkType { get; set; }
///
- /// 标记信息设备类型
+ /// HaveValve: 是否带阀 (0 不带阀, 1 带阀)
+ /// 注意:NULL表示未设置
///
- [Column(IsIgnore = true)]
- public MeterTypeEnum MeterType { get; set; }
+ public int? HaveValve { get; set; }
+
+ ///
+ /// 设备类型: 水表\气表、流量计
+ ///
+ public string MeterTypeName { get; set; }
+
+ ///
+ /// 设备品牌;
+ /// (当 MeterType = 水表, 如 威铭、捷先 等)
+ /// (当 MeterType = 流量计, 如 西恩超声波流量计、西恩电磁流量计、涡街流量计 等)
+ ///
+ public string MeterBrand { get; set; }
+
+ ///
+ /// 倍率
+ ///
+ public decimal TimesRate { get; set; }
///
/// 电表名称
@@ -74,9 +98,9 @@ namespace JiShe.CollectBus.IotSystems.Devices
public int MeteringCode { get; set; }
///
- /// 电表通信地址
+ /// 表通信地址
///
- public string AmmerterAddress { get; set; }
+ public string MeterAddress { get; set; }
///
/// 波特率 default(2400)
diff --git a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs
index be4ae4b..bb143ef 100644
--- a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs
+++ b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Reflection;
using System.Text;
using System.Threading.Tasks;
@@ -31,26 +32,12 @@ namespace JiShe.CollectBus.Common.Consts
///
/// 设备信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记
///
- public const string DeviceInfoHashCacheKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo";
-
-
- public const string MeterInfo = "MeterInfo";
+ public const string CacheDeviceInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo";
///
- /// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ /// 设备信息缓存Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记
///
- public const string CacheMeterInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}";
-
- ///
- /// 缓存表计信息索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
- ///
- public const string CacheMeterInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:SetIndex:{"{3}"}";
-
- ///
- /// 缓存表计信息排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
- ///
- public const string CacheMeterInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ZSetScoresIndex:{"{3}"}";
-
+ public const string CacheDeviceGroupSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceGroupIndex";
public const string TaskInfo = "TaskInfo";
///
diff --git a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
index e3f79ce..178b625 100644
--- a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
+++ b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
@@ -1,4 +1,5 @@
-using System;
+using JiShe.CollectBus.Common.Enums;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -45,5 +46,11 @@ namespace JiShe.CollectBus.Common.Models
/// 采集时间间隔(分钟,如15)
///
public int TimeDensity { get; set; }
+
+ ///
+ /// 表计类型
+ /// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5,燃气表流量计=6,特殊电表=7
+ ///
+ public MeterTypeEnum MeterType { get; set; }
}
}
diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
index a58ded5..8674d67 100644
--- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
+++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
@@ -6,7 +6,7 @@
@{
Layout = null;
}
-
+
diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json
index bc90b01..e150cbe 100644
--- a/web/JiShe.CollectBus.Host/appsettings.json
+++ b/web/JiShe.CollectBus.Host/appsettings.json
@@ -38,7 +38,7 @@
"Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
- "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
+ "EnergyDB": "server=rm-wz9hw529i3j1e3b5fbo.sqlserver.rds.aliyuncs.com,3433;database=db_energy;uid=yjdb;pwd=Kdjdhf+9*7ad222LL;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
@@ -88,7 +88,7 @@
"ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 32,
"DataBaseName": "energy",
- "OpenDebugMode": true,
+ "OpenDebugMode": false,
"UseTableSessionPoolByDefault": false
},
"Cassandra": {
@@ -141,7 +141,7 @@
}
},
"ServerApplicationOptions": {
- "ServerTagName": "JiSheCollectBus99",
+ "ServerTagName": "JiSheCollectBus6",
"SystemType": "Energy",
"FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00",