This commit is contained in:
cli 2025-05-13 17:02:09 +08:00
commit 2e58aecc2a
17 changed files with 665 additions and 1202 deletions

View File

@ -38,465 +38,5 @@ namespace JiShe.CollectBus.FreeRedis
Instance.Notice += (s, e) => Trace.WriteLine(e.Log); Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance; return Instance;
} }
///// <summary>
///// 单个添加数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="data">表计信息</param>
///// <param name="timestamp">可选时间戳</param>
///// <returns></returns>
//public async Task AddMeterCacheData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T data,
//DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
//{
// // 参数校验增强
// if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
// }
// // 计算组合score分类ID + 时间戳)
// var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
// long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
// //全局索引写入
// long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// // 使用事务保证原子性
// using (var trans = Instance.Multi())
// {
// // 主数据存储Hash
// trans.HSet(redisCacheKey, data.MemberID, data.Serialize());
// // 分类索引
// trans.SAdd(redisCacheFocusIndexKey, data.MemberID);
// // 排序索引使用ZSET
// trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID);
// //全局索引
// trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID);
// var results = trans.Exec();
// if (results == null || results.Length <= 0)
// throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
// }
// await Task.CompletedTask;
//}
///// <summary>
///// 批量添加数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="items">数据集合</param>
///// <param name="timestamp">可选时间戳</param>
///// <returns></returns>
//public async Task BatchAddMeterData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//IEnumerable<T> items,
//DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
//{
// if (items == null
// || items.Count() <=0
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101");
// }
// const int BATCH_SIZE = 1000; // 每批1000条
// var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
// foreach (var batch in items.Batch(BATCH_SIZE))
// {
// await semaphore.WaitAsync();
// _ = Task.Run(() =>
// {
// using (var pipe = Instance.StartPipe())
// {
// foreach (var item in batch)
// {
// // 计算组合score分类ID + 时间戳)
// var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
// long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
// //全局索引写入
// long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// // 主数据存储Hash
// pipe.HSet(redisCacheKey, item.MemberID, item.Serialize());
// // 分类索引Set
// pipe.SAdd(redisCacheFocusIndexKey, item.MemberID);
// // 排序索引使用ZSET
// pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID);
// //全局索引
// pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID);
// }
// pipe.EndPipe();
// }
// semaphore.Release();
// });
// }
// await Task.CompletedTask;
//}
///// <summary>
///// 删除指定redis缓存key的缓存数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="data">表计信息</param>
///// <returns></returns>
//public async Task RemoveMeterData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T data) where T : DeviceCacheBasicModel
//{
// if (data == null
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101");
// }
// const string luaScript = @"
// local mainKey = KEYS[1]
// local focusIndexKey = KEYS[2]
// local scoresIndexKey = KEYS[3]
// local globalIndexKey = KEYS[4]
// local member = ARGV[1]
// local deleted = 0
// if redis.call('HDEL', mainKey, member) > 0 then
// deleted = 1
// end
// redis.call('SREM', focusIndexKey, member)
// redis.call('ZREM', scoresIndexKey, member)
// redis.call('ZREM', globalIndexKey, member)
// return deleted
// ";
// var keys = new[]
// {
// redisCacheKey,
// redisCacheFocusIndexKey,
// redisCacheScoresIndexKey,
// redisCacheGlobalIndexKey
// };
// var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
// if ((int)result == 0)
// throw new KeyNotFoundException("指定数据不存在");
//}
///// <summary>
///// 修改表计缓存信息
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="oldRedisCacheFocusIndexKey">旧集中器索引Set缓存Key</param>
///// <param name="newRedisCacheFocusIndexKey">新集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="newData">表计信息</param>
///// <param name="newTimestamp">可选时间戳</param>
///// <returns></returns>
//public async Task UpdateMeterData<T>(
//string redisCacheKey,
//string oldRedisCacheFocusIndexKey,
//string newRedisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T newData,
//DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel
//{
// if (newData == null
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101");
// }
// var luaScript = @"
// local mainKey = KEYS[1]
// local oldFocusIndexKey = KEYS[2]
// local newFocusIndexKey = KEYS[3]
// local scoresIndexKey = KEYS[4]
// local globalIndexKey = KEYS[5]
// local member = ARGV[1]
// local newData = ARGV[2]
// local newScore = ARGV[3]
// local newGlobalScore = ARGV[4]
// -- 校验存在性
// if redis.call('HEXISTS', mainKey, member) == 0 then
// return 0
// end
// -- 更新主数据
// redis.call('HSET', mainKey, member, newData)
// -- 处理变更
// if newScore ~= '' then
// -- 删除旧索引
// redis.call('SREM', oldFocusIndexKey, member)
// redis.call('ZREM', scoresIndexKey, member)
// -- 添加新索引
// redis.call('SADD', newFocusIndexKey, member)
// redis.call('ZADD', scoresIndexKey, newScore, member)
// end
// -- 更新全局索引
// if newGlobalScore ~= '' then
// -- 删除旧索引
// redis.call('ZREM', globalIndexKey, member)
// -- 添加新索引
// redis.call('ZADD', globalIndexKey, newGlobalScore, member)
// end
// return 1
// ";
// var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
// var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds();
// var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks;
// var result = await Instance.EvalAsync(luaScript,
// new[]
// {
// redisCacheKey,
// oldRedisCacheFocusIndexKey,
// newRedisCacheFocusIndexKey,
// redisCacheScoresIndexKey,
// redisCacheGlobalIndexKey
// },
// new object[]
// {
// newData.MemberID,
// newData.Serialize(),
// newScoreValue.ToString() ?? "",
// newGlobalScore.ToString() ?? ""
// });
// if ((int)result == 0)
// {
// throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在");
// }
//}
//public async Task<BusPagedResult<T>> SingleGetMeterPagedData<T>(
//string redisCacheKey,
//string redisCacheScoresIndexKey,
//int focusId,
//int pageSize = 10,
//int pageIndex = 1,
//bool descending = true)
//{
// // 计算score范围
// long minScore = (long)focusId << 32;
// long maxScore = ((long)focusId + 1) << 32;
// // 分页参数计算
// int start = (pageIndex - 1) * pageSize;
// // 获取排序后的member列表
// var members = descending
// ? await Instance.ZRevRangeByScoreAsync(
// redisCacheScoresIndexKey,
// maxScore,
// minScore,
// start,
// pageSize)
// : await Instance.ZRangeByScoreAsync(
// redisCacheScoresIndexKey,
// minScore,
// maxScore,
// start,
// pageSize);
// // 批量获取实际数据
// var dataTasks = members.Select(m =>
// Instance.HGetAsync<T>(redisCacheKey, m)).ToArray();
// await Task.WhenAll(dataTasks);
// // 总数统计优化
// var total = await Instance.ZCountAsync(
// redisCacheScoresIndexKey,
// minScore,
// maxScore);
// return new BusPagedResult<T>
// {
// Items = dataTasks.Select(t => t.Result).ToList(),
// TotalCount = total,
// PageIndex = pageIndex,
// PageSize = pageSize
// };
//}
//public async Task<BusPagedResult<T>> GetFocusPagedData<T>(
//string redisCacheKey,
//string redisCacheScoresIndexKey,
//int focusId,
//int pageSize = 10,
//long? lastScore = null,
//string lastMember = null,
//bool descending = true) where T : DeviceCacheBasicModel
//{
// // 计算分数范围
// long minScore = (long)focusId << 32;
// long maxScore = ((long)focusId + 1) << 32;
// // 获取成员列表
// var members = await GetSortedMembers(
// redisCacheScoresIndexKey,
// minScore,
// maxScore,
// pageSize,
// lastScore,
// lastMember,
// descending);
// // 批量获取数据
// var dataDict = await Instance.HMGetAsync<T>(redisCacheKey, members.CurrentItems);
// return new BusPagedResult<T>
// {
// Items = dataDict,
// TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore),
// HasNext = members.HasNext,
// NextScore = members.NextScore,
// NextMember = members.NextMember
// };
//}
//private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)>
// GetSortedMembers(
// string zsetKey,
// long minScore,
// long maxScore,
// int pageSize,
// long? lastScore,
// string lastMember,
// bool descending)
//{
// var querySize = pageSize + 1;
// var (startScore, exclude) = descending
// ? (lastScore ?? maxScore, lastMember)
// : (lastScore ?? minScore, lastMember);
// var members = descending
// ? await Instance.ZRevRangeByScoreAsync(
// zsetKey,
// max: startScore,
// min: minScore,
// offset: 0,
// count: querySize)
// : await Instance.ZRangeByScoreAsync(
// zsetKey,
// min: startScore,
// max: maxScore,
// offset: 0,
// count: querySize);
// var hasNext = members.Length > pageSize;
// var currentItems = members.Take(pageSize).ToArray();
// var nextCursor = currentItems.Any()
// ? await GetNextCursor(zsetKey, currentItems.Last(), descending)
// : (null, null);
// return (currentItems, hasNext, nextCursor.score, nextCursor.member);
//}
//private async Task<long> GetTotalCount(string zsetKey, long min, long max)
//{
// // 缓存计数优化
// var cacheKey = $"{zsetKey}_count_{min}_{max}";
// var cached = await Instance.GetAsync<long?>(cacheKey);
// if (cached.HasValue)
// return cached.Value;
// var count = await Instance.ZCountAsync(zsetKey, min, max);
// await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
// return count;
//}
//public async Task<Dictionary<int, BusPagedResult<T>>> BatchGetMeterPagedData<T>(
//string redisCacheKey,
//string redisCacheScoresIndexKey,
//IEnumerable<int> focusIds,
//int pageSizePerFocus = 10) where T : DeviceCacheBasicModel
//{
// var results = new ConcurrentDictionary<int, BusPagedResult<T>>();
// var parallelOptions = new ParallelOptions
// {
// MaxDegreeOfParallelism = Environment.ProcessorCount * 2
// };
// await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) =>
// {
// var data = await SingleGetMeterPagedData<T>(
// redisCacheKey,
// redisCacheScoresIndexKey,
// focusId,
// pageSizePerFocus);
// results.TryAdd(focusId, data);
// });
// return new Dictionary<int, BusPagedResult<T>>(results);
//}
} }
} }

View File

@ -133,8 +133,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
try try
{ {
var batchSize = 1000;
var batchSize = 2000;
var batches = entities.Chunk(batchSize); var batches = entities.Chunk(batchSize);
foreach (var batch in batches) foreach (var batch in batches)

View File

@ -72,7 +72,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
/// <returns></returns> /// <returns></returns>
public async Task<AmmeterInfo> GetMeterInfoAsync(string meterType,string timeDensity="15") public async Task<AmmeterInfo> GetMeterInfoAsync(string meterType,string timeDensity="15")
{ {
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}";
// TODO临时写死等确认后如何取再调整 // TODO临时写死等确认后如何取再调整
return await Task.FromResult(new AmmeterInfo() { return await Task.FromResult(new AmmeterInfo() {
ProjectID = 10000, ProjectID = 10000,

View File

@ -28,33 +28,16 @@ namespace JiShe.CollectBus.Application.Contracts
string redisZSetScoresIndexCacheKey, string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel; T data) where T : DeviceCacheBasicModel;
/// <summary> /// <summary>
/// 批量添加数据 /// 批量添加数据
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param> /// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param> /// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">设备缓存信息</param>
/// <param name="items">待缓存数据集合</param> /// <param name="items">待缓存数据集合</param>
/// <returns></returns> /// <returns></returns>
Task BatchInsertDataAsync<T>( Task BatchInsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel;
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">设备缓存信息</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
Task BatchInsertDataAsync2<T>(
string redisSetIndexCacheKey, string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey, string redisDeviceInfoHashCacheKey,
Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel; Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel;
@ -172,45 +155,24 @@ namespace JiShe.CollectBus.Application.Contracts
bool descending = true) bool descending = true)
where T : DeviceCacheBasicModel; where T : DeviceCacheBasicModel;
/// <summary>
///// <summary> /// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
///// 游标分页查询 /// </summary>
///// </summary> /// <typeparam name="T"></typeparam>
///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param> /// <param name="redisCacheDeviceGroupSetIndexKey">ZSET索引缓存Key</param>
///// <param name="pageSize">分页数量</param> /// <param name="redisCacheDeviceInfoHashKey">主数据存储Hash缓存Key</param>
///// <param name="startScore">开始索引</param> /// <param name="pageSize">分页尺寸</param>
///// <param name="excludeMember">开始唯一标识</param> /// <param name="lastScore">最后一个索引</param>
///// <param name="descending">排序方式</param> /// <param name="lastMember">最后一个唯一标识</param>
///// <returns></returns> /// <param name="descending">排序方式</param>
//Task<(List<string> Members, bool HasNext)> GetPagedMembers( /// <returns></returns>
// string redisZSetScoresIndexCacheKey, Task<BusCacheGlobalPagedResult<T>> GetAllPagedData2<T>(
// int pageSize, string redisCacheDeviceGroupSetIndexKey,
// decimal? startScore, string redisCacheDeviceInfoHashKey,
// string excludeMember, int pageSize = 1000,
// bool descending); decimal? lastScore = null,
string lastMember = null,
///// <summary> bool descending = true)
///// 批量获取指定分页的数据 where T : DeviceCacheBasicModel;
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisHashCacheKey">Hash表缓存key</param>
///// <param name="members">Hash表字段集合</param>
///// <returns></returns>
//Task<Dictionary<string, T>> BatchGetData<T>(
// string redisHashCacheKey,
// IEnumerable<string> members)
// where T : DeviceCacheBasicModel;
///// <summary>
///// 获取下一页游标
///// </summary>
///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
///// <param name="lastMember">最后一个唯一标识</param>
///// <param name="descending">排序方式</param>
///// <returns></returns>
//Task<decimal?> GetNextScore(
// string redisZSetScoresIndexCacheKey,
// string lastMember,
// bool descending);
} }
} }

View File

@ -4,6 +4,7 @@ using System.Threading.Tasks;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
@ -34,7 +35,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary> /// </summary>
/// <param name="gatherCode">采集端Code</param> /// <param name="gatherCode">采集端Code</param>
/// <returns></returns> /// <returns></returns>
Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = ""); Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "");
/// <summary> /// <summary>
/// 初始化电表缓存数据 /// 初始化电表缓存数据
@ -82,7 +83,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
/// <summary> /// <summary>
/// 日冻结抄读 /// 日冻结抄读
@ -92,7 +93,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
/// <summary> /// <summary>
/// 月冻结数据抄读 /// 月冻结数据抄读
@ -102,7 +103,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
#endregion #endregion
@ -112,7 +113,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary> /// </summary>
/// <param name="gatherCode">采集端Code</param> /// <param name="gatherCode">采集端Code</param>
/// <returns></returns> /// <returns></returns>
Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = ""); Task<List<DeviceInfo>> GetWatermeterInfoList(string gatherCode = "");
/// <summary> /// <summary>
/// 初始化水表缓存数据,只获取任务数据下发,不构建任务 /// 初始化水表缓存数据,只获取任务数据下发,不构建任务
@ -139,7 +140,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
/// <summary> /// <summary>
/// 自动获取远程通信模块(SIM)版本信息 /// 自动获取远程通信模块(SIM)版本信息
@ -149,7 +150,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps);
#endregion #endregion

View File

@ -26,199 +26,4 @@ public abstract class CollectBusAppService : ApplicationService
LocalizationResource = typeof(CollectBusResource); LocalizationResource = typeof(CollectBusResource);
ObjectMapperContext = typeof(CollectBusApplicationModule); ObjectMapperContext = typeof(CollectBusApplicationModule);
} }
/// <summary>
/// Lua脚本批量获取缓存的表计信息
/// </summary>
/// <typeparam name="T">表信息数据对象</typeparam>
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
/// <param name="systemType"><see cref="SystemTypeConst"/> 系统类型</param>
/// <param name="serverTagName">服务器标识</param>
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param>
/// <returns></returns>
protected async Task<Dictionary<string, Dictionary<string, T>>> GetMeterRedisCacheDictionaryData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
{
if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity))
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101");
}
var meterInfos = new Dictionary<string, Dictionary<string, T>>();
var luaScript = @"
local results = {}
for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key)
results[i] = {key, data}
end
return results";
// 分页参数每页处理10000个键
int pageSize = 10000;
int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
for (int page = 0; page < totalPages; page++)
{
// 分页获取当前批次的键
var batchKeys = redisKeys
.Skip(page * pageSize)
.Take(pageSize)
.ToArray();
// 执行Lua脚本获取当前批次数据
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
if (merterResult == null)
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102");
}
// 解析当前批次的结果
if (merterResult is object[] arr)
{
foreach (object[] item in arr)
{
string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, systemType, serverTagName, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, "");
var meterHashs = new Dictionary<string, T>();
for (int i = 0; i < fieldsAndValues.Length; i += 2)
{
string meterId = (string)fieldsAndValues[i];
string meterStr = (string)fieldsAndValues[i + 1];
T meterInfo = default!;
if (!string.IsNullOrWhiteSpace(meterStr))
{
meterInfo = meterStr.Deserialize<T>()!;
}
if (meterInfo != null)
{
meterHashs[meterId] = meterInfo;
}
else
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103");
}
}
// 合并到总结果若存在重复key则覆盖
if (meterInfos.ContainsKey(focusAddress))
{
foreach (var kvp in meterHashs)
{
meterInfos[focusAddress][kvp.Key] = kvp.Value;
}
}
else
{
meterInfos[focusAddress] = meterHashs;
}
}
}
else
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104");
}
}
return meterInfos;
}
/// <summary>
/// Lua脚本批量获取缓存的表计信息
/// </summary>
/// <typeparam name="T">表信息数据对象</typeparam>
/// <param name="redisKeys">采集频率对应的缓存Key集合</param>
/// <param name="systemType"><see cref="SystemTypeConst"/> 系统类型</param>
/// <param name="serverTagName">服务器标识</param>
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param>
/// <returns></returns>
protected async Task<List<T>> GetMeterRedisCacheListData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
{
if (redisKeys == null || redisKeys.Length <= 0 ||
string.IsNullOrWhiteSpace(systemType) ||
string.IsNullOrWhiteSpace(serverTagName) ||
string.IsNullOrWhiteSpace(timeDensity))
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101");
}
var meterInfos = new List<T>();
var luaScript = @"
local results = {}
for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key)
results[i] = {key, data}
end
return results";
// 分页参数每页10000个键
int pageSize = 10000;
int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
for (int page = 0; page < totalPages; page++)
{
// 分页获取当前批次键
var batchKeys = redisKeys
.Skip(page * pageSize)
.Take(pageSize)
.ToArray();
// 执行Lua脚本获取当前页数据
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
if (merterResult == null)
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102");
}
// 解析当前页结果
if (merterResult is object[] arr)
{
foreach (object[] item in arr)
{
string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = string.Format(
RedisConst.CacheMeterInfoHashKey,
systemType,
serverTagName,
meterType,
timeDensity
);
string focusAddress = key.Replace(redisCacheKey, "");
for (int i = 0; i < fieldsAndValues.Length; i += 2)
{
string meterId = (string)fieldsAndValues[i];
string meterStr = (string)fieldsAndValues[i + 1];
T meterInfo = default!;
if (!string.IsNullOrWhiteSpace(meterStr))
{
meterInfo = meterStr.Deserialize<T>()!;
}
if (meterInfo != null)
{
meterInfos.Add(meterInfo);
}
else
{
throw new Exception(
$"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}-103"
);
}
}
}
}
else
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104");
}
}
return meterInfos;
}
} }

View File

@ -94,6 +94,6 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
_= dbContext.InitAmmeterCacheData(); await dbContext.InitAmmeterCacheData();
} }
} }

View File

@ -72,7 +72,7 @@ namespace JiShe.CollectBus.DataChannels
public async Task ScheduledMeterTaskReadingAsync( public async Task ScheduledMeterTaskReadingAsync(
ChannelReader<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader) ChannelReader<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader)
{ {
const int BatchSize = 100000; const int BatchSize = 50000;
const int EmptyWaitMilliseconds = 50; const int EmptyWaitMilliseconds = 50;
var timeout = TimeSpan.FromSeconds(5); var timeout = TimeSpan.FromSeconds(5);
var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
@ -89,7 +89,7 @@ namespace JiShe.CollectBus.DataChannels
{ {
if (timeoutMilliseconds > 0) if (timeoutMilliseconds > 0)
{ {
_logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 任务数据通道处理数据耗时{timeoutMilliseconds}毫秒");
} }
timeoutMilliseconds = 0; timeoutMilliseconds = 0;
//无消息时短等待50毫秒 //无消息时短等待50毫秒

View File

@ -94,73 +94,11 @@ namespace JiShe.CollectBus.RedisDataCache
/// 批量添加数据 /// 批量添加数据
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param> /// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param> /// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
/// <param name="items">待缓存数据集合</param> /// <param name="items">待缓存数据集合</param>
/// <returns></returns> /// <returns></returns>
public async Task BatchInsertDataAsync<T>( public async Task BatchInsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel
{
if (items == null
|| items.Count() <= 0
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
)
{
_logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
return;
}
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
foreach (var batch in items.Batch(BATCH_SIZE))
{
await semaphore.WaitAsync();
_ = Task.Run(() =>
{
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
{
// 主数据存储Hash
pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize());
// Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, $"{item.TimeDensity.ToString().PadLeft(2, '0')}:{item.FocusAddress}");
// ZSET索引缓存Key
pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId);
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.FocusAddress, item.Serialize());
}
pipe.EndPipe();
}
semaphore.Release();
});
}
await Task.CompletedTask;
}
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
public async Task BatchInsertDataAsync2<T>(
string redisSetIndexCacheKey, string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey, string redisDeviceInfoHashCacheKey,
Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel
@ -699,6 +637,166 @@ namespace JiShe.CollectBus.RedisDataCache
} }
/// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisCacheDeviceGroupSetIndexKey">ZSET索引缓存Key</param>
/// <param name="redisCacheDeviceInfoHashKey">主数据存储Hash缓存Key</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedData2<T>(
string redisCacheDeviceGroupSetIndexKey,
string redisCacheDeviceInfoHashKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel
{
// 参数校验增强
if (string.IsNullOrWhiteSpace(redisCacheDeviceInfoHashKey) ||
string.IsNullOrWhiteSpace(redisCacheDeviceGroupSetIndexKey))
{
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
pageSize = Math.Clamp(pageSize, 1, 10000);
var luaScript = @"
local command = ARGV[1]
local range_start = ARGV[2]
local range_end = ARGV[3]
local limit = tonumber(ARGV[4])
local last_score = ARGV[5]
local last_member = ARGV[6]
--
local members
if command == 'ZRANGEBYSCORE' then
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
else
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
end
--
local filtered_members = {}
local count = 0
for i = 1, #members, 2 do
local member = members[i]
local score = members[i+1]
local include = true
if last_score ~= '' and last_member ~= '' then
if command == 'ZRANGEBYSCORE' then
-- score > last_score (score == last_score member > last_member)
if score == last_score then
include = member > last_member
else
include = tonumber(score) > tonumber(last_score)
end
else
-- score < last_score (score == last_score member < last_member)
if score == last_score then
include = member < last_member
else
include = tonumber(score) < tonumber(last_score)
end
end
end
if include then
table.insert(filtered_members, member)
table.insert(filtered_members, score)
count = count + 1
if count >= limit then
break
end
end
end
--
local result_members, result_scores = {}, {}
for i=1,#filtered_members,2 do
table.insert(result_members, filtered_members[i])
table.insert(result_scores, filtered_members[i+1])
end
if #result_members == 0 then return {0,{},{},{}} end
-- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
// 调整范围构造逻辑(移除排他符号)
string rangeStart, rangeEnd;
if (descending)
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
rangeEnd = "-inf";
}
else
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
rangeEnd = "+inf";
}
var scriptResult = (object[])await Instance.EvalAsync(luaScript,
new[] { redisCacheDeviceGroupSetIndexKey, redisCacheDeviceInfoHashKey },
new object[]
{
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart,
rangeEnd,
(pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页
lastScore?.ToString() ?? "",
lastMember ?? ""
});
if ((long)scriptResult[0] == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 处理结果集
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
var validItems = members.AsParallel()
.Select((m, i) =>
{
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
catch { return null; }
})
.Where(x => x != null)
.ToList();
var hasNext = validItems.Count > pageSize;
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
//分页锚点索引
decimal? nextScore = null;
string nextMember = null;
if (hasNext && actualItems.Any())
{
var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引
nextScore = scores[lastIndex];
nextMember = members[lastIndex];
}
return new BusCacheGlobalPagedResult<T>
{
Items = actualItems.ToList(),
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember,
TotalCount = await GetTotalCount(redisCacheDeviceGroupSetIndexKey),
PageSize = pageSize,
};
}
/// <summary> /// <summary>
/// 游标分页查询 /// 游标分页查询
/// </summary> /// </summary>

View File

@ -350,22 +350,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
//// 初始化缓存 //// 初始化缓存
//DeviceGroupBalanceControl.InitializeCache(deviceList); //DeviceGroupBalanceControl.InitializeCache(deviceList);
var timeDensity = "15"; //var timeDensity = "15";
//获取缓存中的电表信息 ////获取缓存中的电表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; //var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter); //var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
List<string> focusAddressDataLista = new List<string>(); //List<string> focusAddressDataLista = new List<string>();
foreach (var item in meterInfos) //foreach (var item in meterInfos)
{ //{
focusAddressDataLista.Add(item.FocusAddress); // focusAddressDataLista.Add(item.FocusAddress);
} //}
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
// 打印分布统计 //// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats(); //DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask; await Task.CompletedTask;
} }
@ -392,46 +392,46 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet] [HttpGet]
public async Task TestRedisCacheGetAllPagedData() public async Task TestRedisCacheGetAllPagedData()
{ {
var timeDensity = "15"; //var timeDensity = "15";
string SystemType = "Energy"; //string SystemType = "Energy";
string ServerTagName = "JiSheCollectBus2"; //string ServerTagName = "JiSheCollectBus2";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var timer1 = Stopwatch.StartNew(); //var timer1 = Stopwatch.StartNew();
decimal? cursor = null; //decimal? cursor = null;
string member = null; //string member = null;
bool hasNext; //bool hasNext;
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); //List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
do //do
{ //{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( // var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp, // redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp, // redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000, // pageSize: 1000,
lastScore: cursor, // lastScore: cursor,
lastMember: member); // lastMember: member);
meterInfos.AddRange(page.Items); // meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null; // cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null; // member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext; // hasNext = page.HasNext;
} while (hasNext); //} while (hasNext);
timer1.Stop(); //timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); //_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
List<string> focusAddressDataLista = new List<string>(); //List<string> focusAddressDataLista = new List<string>();
foreach (var item in meterInfos) //foreach (var item in meterInfos)
{ //{
focusAddressDataLista.Add(item.FocusAddress); // focusAddressDataLista.Add(item.FocusAddress);
} //}
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
// 打印分布统计 //// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats(); //DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask; await Task.CompletedTask;
} }
@ -541,22 +541,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet] [HttpGet]
public async Task TestRedisCacheGetData(string scores) public async Task TestRedisCacheGetData(string scores)
{ {
var timeDensity = "15"; //var timeDensity = "15";
string SystemType = "Energy"; //string SystemType = "Energy";
string ServerTagName = "JiSheCollectBus5"; //string ServerTagName = "JiSheCollectBus5";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var page = await _redisDataCacheService.GetSingleData<AmmeterInfo>( //var page = await _redisDataCacheService.GetSingleData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp, // redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp, // redisCacheMeterInfoZSetScoresIndexKeyTemp,
"973219481:17", // "973219481:17",
pageSize: 1000, // pageSize: 1000,
lastScore: 100, // lastScore: 100,
lastMember: "memberId", // lastMember: "memberId",
descending: true // descending: true
); // );
await Task.CompletedTask; await Task.CompletedTask;
} }

View File

@ -15,11 +15,12 @@ using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Models; using JiShe.CollectBus.Protocol.Models;
using Mapster;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System; using System;
@ -140,7 +141,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间"); //_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
//return; //return;
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTime, nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
@ -158,7 +159,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
{ {
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTime, nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
@ -175,7 +176,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
{ {
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTime, nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
@ -192,7 +193,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
{ {
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTime, nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
@ -209,7 +210,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
{ {
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTime, nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
@ -243,7 +244,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTaskTime, nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
@ -261,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{ {
_ = CreateMeterPublishTask<WatermeterInfo>( _ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTaskTime, nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.WaterMeter, meterType: MeterTypeEnum.WaterMeter,
@ -310,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary> /// </summary>
/// <param name="gatherCode">采集端Code</param> /// <param name="gatherCode">采集端Code</param>
/// <returns></returns> /// <returns></returns>
public virtual Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "") public virtual Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "")
{ {
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
} }
@ -322,201 +323,175 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
//return; //return;
// 创建取消令牌源 try
//var cts = new CancellationTokenSource();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader );
//此处代码不要删除
#if DEBUG
var timeDensity = "15";
var serverTagName = "JiSheCollectBus2";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
List<string> focusAddressDataLista = new List<string>();
var timer1 = Stopwatch.StartNew();
var allIds = new HashSet<string>();
decimal? score = null;
string member = null;
while (true)
{ {
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( // 创建取消令牌源
redisCacheMeterInfoHashKeyTemp, //var cts = new CancellationTokenSource();
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: score,
lastMember: member);
meterInfos.AddRange(page.Items); _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
focusAddressDataLista.AddRange(page.Items.Select(d => $"{d.MeterId}"));
foreach (var item in page.Items) //此处代码不要删除
#if DEBUG
var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus2:DeviceInfo";
var timer1 = Stopwatch.StartNew();
Dictionary<string, List<DeviceInfo>> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);
List<DeviceInfo> meterInfos = new List<DeviceInfo>();
List<string> focusAddressDataLista = new List<string>();
foreach (var item in keyValuePairsTemps)
{ {
if (!allIds.Add(item.MemberId)) foreach (var subItem in item.Value)
{ {
_logger.LogError($"{item.MemberId}Duplicate data found!"); if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
{
meterInfos.Add(subItem);
focusAddressDataLista.Add(subItem.MeterId.ToString());
}
} }
} }
if (!page.HasNext) break;
score = page.NextScore;
member = page.NextMember;
}
timer1.Stop(); timer1.Stop();
_logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
//return; return;
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif #endif
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
}
var timer = Stopwatch.StartNew();
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var currentTaskTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = currentTaskTime;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key), _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
TimeDensity = item.Key, return;
}; }
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,读取数据成功");
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
Dictionary<string, List<AmmeterInfo>> keyValuePairs = new Dictionary<string, List<AmmeterInfo>>();
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>(); //获取采集项类型数据
//将表计信息根据集中器分组,获得集中器号 var gatherItemInfos = await GetGatherItemByDataTypes();
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
foreach (var item in meterInfoGroup)
{ {
if (string.IsNullOrWhiteSpace(item.Key))//集中器号为空,跳过 _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
{ return;
continue; }
} var timer = Stopwatch.StartNew();
foreach (var ammeter in item) List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
{
deviceIds.Add(ammeter.MeterId.ToString());
//处理ItemCode //根据采集频率分组,获得采集频率分组
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes)) var meterInfoGroupByTimeDensity = meterInfos.Select(d => d.TimeDensity).GroupBy(d => d);
var currentTaskTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = currentTaskTime;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
TimeDensity = item.Key,
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
//设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
//设备分组集合key
string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
Dictionary<string, List<DeviceInfo>> keyValuePairs = new Dictionary<string, List<DeviceInfo>>();
//处理设备缓存信息
foreach (var ammeter in meterInfos)
{
deviceIds.Add(ammeter.MeterId.ToString());
//处理ItemCode
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
var itemArr = ammeter.DataTypes.Split(',').ToList();
#region
List<string> itemCodeList = new List<string>();
foreach (var dataType in itemArr)
{ {
var itemArr = ammeter.DataTypes.Split(',').ToList(); var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
#region if (gatherItem != null)
List<string> itemCodeList = new List<string>();
foreach (var dataType in itemArr)
{ {
var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表 if (!excludeItemCode.Contains(gatherItem.ItemCode))
var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
if (gatherItem != null)
{ {
if (!excludeItemCode.Contains(gatherItem.ItemCode)) itemCodeList.Add(gatherItem.ItemCode);
{
itemCodeList.Add(gatherItem.ItemCode);
}
} }
}
#region #region
if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
{ {
itemCodeList.Add("10_95"); itemCodeList.Add("10_95");
} }
if (itemArr.Exists(e => e.Equals("109")))//WAVE_109 if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
{ {
itemCodeList.Add("10_109"); itemCodeList.Add("10_109");
}
#endregion
} }
#endregion #endregion
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
} }
#endregion
ammeterInfos.Add(ammeter);
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{ {
keyValuePairs[ammeter.FocusAddress] = new List<AmmeterInfo>() {ammeter }; ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
else
{
keyValuePairs[ammeter.FocusAddress].Add(ammeter);
} }
} }
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{
keyValuePairs[ammeter.FocusAddress] = new List<DeviceInfo>() { ammeter.Adapt<DeviceInfo>() };
}
else
{
keyValuePairs[ammeter.FocusAddress].Add(ammeter.Adapt<DeviceInfo>());
}
} }
//await _redisDataCacheService.BatchInsertDataAsync2<AmmeterInfo>( await _redisDataCacheService.BatchInsertDataAsync<DeviceInfo>(
// redisCacheMeterInfoSetIndexKey, redisCacheDeviceGroupSetIndexKey,
// redisDeviceInfoHashCacheKey, redisCacheDeviceInfoHashKey,
// keyValuePairs); keyValuePairs);
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>( //初始化设备组负载控制
redisCacheMeterInfoHashKey, if (deviceIds == null || deviceIds.Count <= 0)
redisCacheMeterInfoSetIndexKey, {
redisCacheMeterInfoZSetScoresIndexKey, _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
redisDeviceInfoHashCacheKey,
ammeterInfos); }
else
{
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
timer.Stop();
_logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
} }
catch (Exception ex)
//初始化设备组负载控制
if (deviceIds == null || deviceIds.Count <= 0)
{ {
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); _logger.LogError(ex, $"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据异常");
throw ex;
} }
else
{
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
timer.Stop();
_logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
} }
/// <summary> /// <summary>
@ -636,7 +611,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
private async Task<List<MeterReadingTelemetryPacketInfo>> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) private async Task<List<MeterReadingTelemetryPacketInfo>> AmmerterCreatePublishTaskAction(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
@ -808,7 +783,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -842,7 +817,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ItemCode = itemCode, ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest() SubProtocolRequest = new SubProtocolBuildRequest()
{ {
MeterAddress = ammeterInfo.AmmerterAddress, MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password, Password = ammeterInfo.Password,
ItemCode = subItemCode, ItemCode = subItemCode,
} }
@ -884,7 +859,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -954,7 +929,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -1093,7 +1068,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary> /// </summary>
/// <param name="gatherCode">采集端Code</param> /// <param name="gatherCode">采集端Code</param>
/// <returns></returns> /// <returns></returns>
public virtual Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "") public virtual Task<List<DeviceInfo>> GetWatermeterInfoList(string gatherCode = "")
{ {
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现"); throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
} }
@ -1121,7 +1096,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。 List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组 //根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); var meterInfoGroupByTimeDensity = meterInfos.Select(d=>d.TimeDensity).GroupBy(d => d);
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false) if (_applicationOptions.FirstCollectionTime.HasValue == false)
{ {
@ -1144,40 +1119,35 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key); var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
} }
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) //设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
//设备分组集合key
string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
Dictionary<string, List<DeviceInfo>> keyValuePairs = new Dictionary<string, List<DeviceInfo>>();
foreach (var subItem in meterInfos)
{ {
List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>(); deviceIds.Add(subItem.MeterId.ToString());
//将表计信息根据集中器分组,获得集中器号 if (!keyValuePairs.ContainsKey(subItem.FocusAddress))
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{ {
if (string.IsNullOrWhiteSpace(item.Key)) keyValuePairs[subItem.FocusAddress] = new List<DeviceInfo>() { subItem };
{ }
continue; else
} {
keyValuePairs[subItem.FocusAddress].Add(subItem);
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
foreach (var subItem in item)
{
deviceIds.Add(subItem.MeterId.ToString());
watermeterInfo.Add(subItem);
}
await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisDeviceInfoHashCacheKey,
redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
} }
} }
await _redisDataCacheService.BatchInsertDataAsync<DeviceInfo>(
redisCacheDeviceGroupSetIndexKey,
redisCacheDeviceInfoHashKey,
keyValuePairs);
//初始化设备组负载控制 //初始化设备组负载控制
if (deviceIds == null || deviceIds.Count <= 0) if (deviceIds == null || deviceIds.Count <= 0)
{ {
@ -1241,7 +1211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="timestamps">时间格式的任务批次名称</param> /// <param name="timestamps">时间格式的任务批次名称</param>
/// <returns></returns> /// <returns></returns>
private async Task<List<MeterReadingTelemetryPacketInfo>> WatermeterCreatePublishTaskAction(int timeDensity private async Task<List<MeterReadingTelemetryPacketInfo>> WatermeterCreatePublishTaskAction(int timeDensity
, WatermeterInfo watermeter, int groupIndex, DateTime timestamps) , DeviceInfo watermeter, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
@ -1270,7 +1240,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据表型号获取协议插件 //根据表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code); var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.BrandType);
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); //_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
@ -1351,7 +1321,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -1392,14 +1362,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}); });
var meterReadingRecords = CreateAmmeterPacketInfo( var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo, ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse, builderResponse: builderResponse,
itemCode: itemCode, itemCode: itemCode,
subItemCode: null, subItemCode: null,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TerminalVersion); packetType: TelemetryPacketTypeEnum.TerminalVersion);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
@ -1425,7 +1396,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -1459,14 +1430,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}); });
var meterReadingRecords = CreateAmmeterPacketInfo( var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo, ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse, builderResponse: builderResponse,
itemCode: itemCode, itemCode: itemCode,
subItemCode: null, subItemCode: null,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TelematicsModule); packetType: TelemetryPacketTypeEnum.TelematicsModule);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
@ -1517,32 +1489,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息 //获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}"; //设备hash缓存key
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}"; string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
List<T> meterInfos = new List<T>(); //设备分组集合key
decimal? cursor = null; string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
string member = null;
while (true) //List<T> meterInfos = new List<T>();
{ //decimal? cursor = null;
var page = await _redisDataCacheService.GetAllPagedData<T>( //string member = null;
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items); //while (true)
if (!page.HasNext) //{
{ // var page = await _redisDataCacheService.GetAllPagedData2<T>(
break; // redisCacheDeviceGroupSetIndexKey,
} // redisCacheDeviceInfoHashKey,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
cursor = page.NextScore; // meterInfos.AddRange(page.Items);
member = page.NextMember; // if (!page.HasNext)
} // {
// break;
// }
// cursor = page.NextScore;
// member = page.NextMember;
//}
//var page = await _redisDataCacheService.GetAllPagedData<T>( //var page = await _redisDataCacheService.GetAllPagedData<T>(
// redisCacheMeterInfoHashKeyTemp, // redisCacheMeterInfoHashKeyTemp,
@ -1552,16 +1526,32 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// lastMember: member); // lastMember: member);
//meterInfos.AddRange(page.Items); //meterInfos.AddRange(page.Items);
if (meterInfos == null || meterInfos.Count <= 0) //if (meterInfos == null || meterInfos.Count <= 0)
{ //{
timer.Stop(); // timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); // _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return; // return;
} //}
Dictionary<string, List<T>> keyValuePairs = FreeRedisProvider.Instance.HGetAll<List<T>>(redisCacheDeviceInfoHashKey);
timer.Stop(); timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒"); _logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
List<T> meterInfos = new List<T>();
foreach (var item in keyValuePairs)
{
foreach (var subItem in item.Value)
{
if (subItem.MeterType == meterType && subItem.TimeDensity == timeDensity)
{
meterInfos.Add(subItem);
}
}
}
timer.Restart(); timer.Restart();
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
@ -1652,7 +1642,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="creationTime">数据创建时间戳</param> /// <param name="creationTime">数据创建时间戳</param>
/// <param name="packetType">数据包类型</param> /// <param name="packetType">数据包类型</param>
/// <returns></returns> /// <returns></returns>
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType)
{ {
try try
{ {
@ -1667,7 +1657,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
DatabaseBusiID = ammeterInfo.DatabaseBusiID, DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime, PendingCopyReadTime = pendingCopyReadTime,
CreationTime = creationTime, CreationTime = creationTime,
MeterAddress = ammeterInfo.AmmerterAddress, MeterAddress = ammeterInfo.MeterAddress,
PacketType = (int)packetType, PacketType = (int)packetType,
AFN = builderResponse.AFn, AFN = builderResponse.AFn,
Fn = builderResponse.Fn, Fn = builderResponse.Fn,

View File

@ -12,6 +12,7 @@ using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
@ -39,7 +40,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string systemType = string.Empty; string systemType = string.Empty;
string serverTagName = string.Empty; string serverTagName = string.Empty;
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IIoTDbProvider _dbProvider;
private readonly IProtocolService _protocolService; private readonly IProtocolService _protocolService;
public EnergySystemScheduledMeterReadingService( public EnergySystemScheduledMeterReadingService(
@ -62,7 +62,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
serverTagName = applicationOptions.Value.ServerTagName; serverTagName = applicationOptions.Value.ServerTagName;
systemType = applicationOptions.Value.SystemType; systemType = applicationOptions.Value.SystemType;
_dbProvider = dbProvider;
_logger = logger; _logger = logger;
_protocolService = protocolService; _protocolService = protocolService;
} }
@ -97,59 +96,93 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
//[HttpGet] //[HttpGet]
//[Route($"ammeter/list")] //[Route($"ammeter/list")]
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") public override async Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{ {
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>(); //List<DeviceInfo> ammeterInfos = new List<DeviceInfo>();
ammeterInfos.Add(new AmmeterInfo() //ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "402440506",
// Name = "张家祠工务(三相电表)",
// FocusId = 95780,
// DatabaseBusiID = 1,
// MeteringCode = 1,
// MeterAddress = "402410040506",
// MeterId = 127035,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DDS1980",
//});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "542400504",
// Name = "五号配(长芦二所四排)(单相电表)",
// FocusId = 69280,
// DatabaseBusiID = 1,
// MeteringCode = 2,
// MeterAddress = "542410000504",
// MeterId = 95594,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
// BrandType = "DDS1980",
//});
//return ammeterInfos;
try
{ {
Baudrate = 2400, string sql = $@"SELECT
FocusAddress = "402440506", C.ID as MeterId,
Name = "张家祠工务(三相电表)", C.Name,
FocusId = 95780, C.FocusID as FocusId,
DatabaseBusiID = 1, C.SingleRate,
MeteringCode = 1, C.MeteringCode,
AmmerterAddress = "402410040506", C.Code AS BrandType,
MeterId = 127035, C.Baudrate,
TypeName = 3, C.Password,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", C.MeteringPort,
TimeDensity = 15, C.[Address] AS MeterAddress,
BrandType = "DDS1980", C.TypeName,
}); C.Protocol,
C.TripState,
C.[State],
B.[Address],
B.AreaCode,
B.AutomaticReport,
D.DataTypes,
B.TimeDensity,
A.GatherCode,
C.Special,
C.[ProjectID],
B.AbnormalState,
B.LastTime,
1 as MeterType,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
ammeterInfos.Add(new AmmeterInfo() if (!string.IsNullOrWhiteSpace(gatherCode))
{ {
Baudrate = 2400, sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
FocusAddress = "542400504", }
Name = "五号配(长芦二所四排)(单相电表)", return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
FocusId = 69280, .Ado
DatabaseBusiID = 1, .QueryAsync<DeviceInfo>(sql);
MeteringCode = 2, }
AmmerterAddress = "542410000504", catch (Exception ex)
MeterId = 95594, {
TypeName = 1, throw ex;
DataTypes = "581,589,592,597,601",
TimeDensity = 15,
BrandType = "DDS1980",
});
return ammeterInfos;
string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
} }
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<AmmeterInfo>(sql);
} }
@ -160,7 +193,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime) public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{ {
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId try
{
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
FROM TB_AutoTripTask(nolock) AS A FROM TB_AutoTripTask(nolock) AS A
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
@ -168,14 +203,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' "; WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' ";
if (!string.IsNullOrWhiteSpace(currentTime)) if (!string.IsNullOrWhiteSpace(currentTime))
{ {
sql = $@"{sql} AND A.TripTime = '{currentTime}'"; sql = $@"{sql} AND A.TripTime = '{currentTime}'";
} }
return await SqlProvider.Instance.Change(DbEnum.EnergyDB) return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado .Ado
.QueryAsync<AmmeterAutoValveControlSetting>(sql); .QueryAsync<AmmeterAutoValveControlSetting>(sql);
}
catch (Exception)
{
throw;
}
} }
@ -200,7 +241,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//批量获取对应的缓存电表信息 //批量获取对应的缓存电表信息
var ammeterInfos = new List<AmmeterInfo>(); var ammeterInfos = new List<DeviceInfo>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var settingInfo in settingInfos) foreach (var settingInfo in settingInfos)
@ -270,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ItemCode = itemCode, ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest() SubProtocolRequest = new SubProtocolBuildRequest()
{ {
MeterAddress = ammeterInfo.AmmerterAddress, MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password, Password = ammeterInfo.Password,
ItemCode = subItemCode, ItemCode = subItemCode,
} }
@ -311,9 +352,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
//[HttpGet] //[HttpGet]
//[Route($"ammeter/list")] //[Route($"ammeter/list")]
public override async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890") public override async Task<List<DeviceInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
{ {
string sql = $@"SELECT try
{
string sql = $@"SELECT
A.ID as MeterId, A.ID as MeterId,
A.Name, A.Name,
A.FocusID as FocusId, A.FocusID as FocusId,
@ -340,6 +383,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
A.[ProjectID], A.[ProjectID],
B.AbnormalState, B.AbnormalState,
B.LastTime, B.LastTime,
2 as MeterType,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress, CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID (select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID
FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A
@ -347,13 +391,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID
WHERE A.State>=0 AND A.State<100 "; WHERE A.State>=0 AND A.State<100 ";
if (!string.IsNullOrWhiteSpace(gatherCode)) if (!string.IsNullOrWhiteSpace(gatherCode))
{ {
sql = $@"{sql} AND C.GatherCode= '{gatherCode}'"; sql = $@"{sql} AND C.GatherCode= '{gatherCode}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<DeviceInfo>(sql);
}
catch (Exception)
{
throw;
} }
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<WatermeterInfo>(sql);
} }
} }
} }

View File

@ -13,9 +13,9 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.IotSystems.Devices namespace JiShe.CollectBus.IotSystems.Devices
{ {
/// <summary> /// <summary>
/// 设备缓存信息 /// 设备信息
/// </summary> /// </summary>
public class DeviceCacheInfo : DeviceCacheBasicModel public class DeviceInfo : DeviceCacheBasicModel
{ {
/// <summary> /// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义 /// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
@ -30,10 +30,34 @@ namespace JiShe.CollectBus.IotSystems.Devices
public override long ScoreValue => Common.Helpers.CommonHelper.GetFocusScores(FocusAddress, MeteringCode); public override long ScoreValue => Common.Helpers.CommonHelper.GetFocusScores(FocusAddress, MeteringCode);
/// <summary> /// <summary>
/// 标记信息设备类型 /// 通讯方案:
/// NB-IOT常德水表、NB-IOT泽联电表、GPRS华立水表、
/// RS-485、无线、载波
/// </summary> /// </summary>
[Column(IsIgnore = true)] public string LinkType { get; set; }
public MeterTypeEnum MeterType { get; set; }
/// <summary>
/// HaveValve: 是否带阀 0 不带阀, 1 带阀)
/// 注意NULL表示未设置
/// </summary>
public int? HaveValve { get; set; }
/// <summary>
/// 设备类型: 水表\气表、流量计
/// </summary>
public string MeterTypeName { get; set; }
/// <summary>
/// 设备品牌;
/// (当 MeterType = 水表, 如 威铭、捷先 等)
/// (当 MeterType = 流量计, 如 西恩超声波流量计、西恩电磁流量计、涡街流量计 等)
/// </summary>
public string MeterBrand { get; set; }
/// <summary>
/// 倍率
/// </summary>
public decimal TimesRate { get; set; }
/// <summary> /// <summary>
/// 电表名称 /// 电表名称
@ -74,9 +98,9 @@ namespace JiShe.CollectBus.IotSystems.Devices
public int MeteringCode { get; set; } public int MeteringCode { get; set; }
/// <summary> /// <summary>
/// 表通信地址 /// 表通信地址
/// </summary> /// </summary>
public string AmmerterAddress { get; set; } public string MeterAddress { get; set; }
/// <summary> /// <summary>
/// 波特率 default(2400) /// 波特率 default(2400)

View File

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reflection;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -31,26 +32,12 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary> /// <summary>
/// 设备信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记 /// 设备信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记
/// </summary> /// </summary>
public const string DeviceInfoHashCacheKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo"; public const string CacheDeviceInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo";
public const string MeterInfo = "MeterInfo";
/// <summary> /// <summary>
/// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记{2}=>表计类别,{3}=>采集频率 /// 设备信息缓存Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记
/// </summary> /// </summary>
public const string CacheMeterInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}"; public const string CacheDeviceGroupSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceGroupIndex";
/// <summary>
/// 缓存表计信息索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary>
public const string CacheMeterInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:SetIndex:{"{3}"}";
/// <summary>
/// 缓存表计信息排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary>
public const string CacheMeterInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ZSetScoresIndex:{"{3}"}";
public const string TaskInfo = "TaskInfo"; public const string TaskInfo = "TaskInfo";
/// <summary> /// <summary>

View File

@ -1,4 +1,5 @@
using System; using JiShe.CollectBus.Common.Enums;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -45,5 +46,11 @@ namespace JiShe.CollectBus.Common.Models
/// 采集时间间隔(分钟如15) /// 采集时间间隔(分钟如15)
/// </summary> /// </summary>
public int TimeDensity { get; set; } public int TimeDensity { get; set; }
/// <summary>
/// 表计类型
/// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5燃气表流量计=6,特殊电表=7
/// </summary>
public MeterTypeEnum MeterType { get; set; }
} }
} }

View File

@ -38,7 +38,7 @@
"Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", "Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" "EnergyDB": "server=rm-wz9hw529i3j1e3b5fbo.sqlserver.rds.aliyuncs.com,3433;database=db_energy;uid=yjdb;pwd=Kdjdhf+9*7ad222LL;Encrypt=False;Trust Server Certificate=False"
}, },
"Redis": { "Redis": {
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",