合并代码,解决冲突

This commit is contained in:
ChenYi 2025-04-17 11:41:46 +08:00
commit c97634e474
24 changed files with 2317 additions and 544 deletions

View File

@ -0,0 +1,187 @@
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Application.Contracts
{
/// <summary>
/// 数据缓存服务接口
/// </summary>
public interface IRedisDataCacheService
{
/// <summary>
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">待缓存数据</param>
/// <returns></returns>
Task InsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel;
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
Task BatchInsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel;
/// <summary>
/// 删除缓存信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">已缓存数据</param>
/// <returns></returns>
Task RemoveCacheDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel;
/// <summary>
/// 修改缓存信息,映射关系未发生改变
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="newData">待修改缓存数据</param>
/// <returns></returns>
Task ModifyDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
T newData) where T : DeviceCacheBasicModel;
/// <summary>
/// 修改缓存信息,映射关系已改变
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="oldMemberId">旧的映射关系</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="newData">待修改缓存数据</param>
/// <returns></returns>
Task ModifyDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string oldMemberId,
string redisZSetScoresIndexCacheKey,
T newData) where T : DeviceCacheBasicModel;
///// <summary>
///// 通过集中器与表计信息排序索引获取数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
///// <param name="pageSize">分页尺寸</param>
///// <param name="lastScore">最后一个索引</param>
///// <param name="lastMember">最后一个唯一标识</param>
///// <param name="descending">排序方式</param>
///// <returns></returns>
//Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
//string redisHashCacheKey,
//string redisZSetScoresIndexCacheKey,
//IEnumerable<int> focusIds,
//int pageSize = 10,
//decimal? lastScore = null,
//string lastMember = null,
//bool descending = true)
//where T : DeviceCacheBasicModel;
/// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
Task<BusCacheGlobalPagedResult<T>> GetAllPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel;
/// <summary>
/// 优化后的分页获取方法(支持百万级数据)
/// </summary>
Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel;
///// <summary>
///// 游标分页查询
///// </summary>
///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
///// <param name="pageSize">分页数量</param>
///// <param name="startScore">开始索引</param>
///// <param name="excludeMember">开始唯一标识</param>
///// <param name="descending">排序方式</param>
///// <returns></returns>
//Task<(List<string> Members, bool HasNext)> GetPagedMembers(
// string redisZSetScoresIndexCacheKey,
// int pageSize,
// decimal? startScore,
// string excludeMember,
// bool descending);
///// <summary>
///// 批量获取指定分页的数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisHashCacheKey">Hash表缓存key</param>
///// <param name="members">Hash表字段集合</param>
///// <returns></returns>
//Task<Dictionary<string, T>> BatchGetData<T>(
// string redisHashCacheKey,
// IEnumerable<string> members)
// where T : DeviceCacheBasicModel;
///// <summary>
///// 获取下一页游标
///// </summary>
///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
///// <param name="lastMember">最后一个唯一标识</param>
///// <param name="descending">排序方式</param>
///// <returns></returns>
//Task<decimal?> GetNextScore(
// string redisZSetScoresIndexCacheKey,
// string lastMember,
// bool descending);
}
}

View File

@ -79,7 +79,7 @@ public abstract class CollectBusAppService : ApplicationService
{ {
string key = (string)item[0]; string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1]; object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, systemType, serverTagName, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, ""); string focusAddress = key.Replace(redisCacheKey, "");
var meterHashs = new Dictionary<string, T>(); var meterHashs = new Dictionary<string, T>();
@ -182,7 +182,7 @@ public abstract class CollectBusAppService : ApplicationService
string key = (string)item[0]; string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1]; object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = string.Format( var redisCacheKey = string.Format(
RedisConst.CacheMeterInfoKey, RedisConst.CacheMeterInfoHashKey,
systemType, systemType,
serverTagName, serverTagName,
meterType, meterType,

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,9 @@ using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using System.Text.Json; using System.Text.Json;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Models;
using System.Diagnostics;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
@ -32,17 +35,23 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
private readonly IIoTDBProvider _iotDBProvider; private readonly IIoTDBProvider _iotDBProvider;
private readonly IoTDBRuntimeContext _dbContext; private readonly IoTDBRuntimeContext _dbContext;
private readonly IoTDBOptions _options; private readonly IoTDBOptions _options;
private readonly IRedisDataCacheService _redisDataCacheService;
public SampleAppService(IIoTDBProvider iotDBProvider, IOptions<IoTDBOptions> options, public SampleAppService(IIoTDBProvider iotDBProvider, IOptions<IoTDBOptions> options,
IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger) IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService)
{ {
_iotDBProvider = iotDBProvider; _iotDBProvider = iotDBProvider;
_options = options.Value; _options = options.Value;
_dbContext = dbContext; _dbContext = dbContext;
_logger = logger; _logger = logger;
_redisDataCacheService = redisDataCacheService;
} }
/// <summary>
/// 测试 UseSessionPool
/// </summary>
/// <param name="timestamps"></param>
/// <returns></returns>
[HttpGet] [HttpGet]
public async Task UseSessionPool(long timestamps) public async Task UseSessionPool(long timestamps)
{ {
@ -72,7 +81,10 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
/// <summary>
/// 测试Session切换
/// </summary>
/// <returns></returns>
[HttpGet] [HttpGet]
public async Task UseTableSessionPool() public async Task UseTableSessionPool()
{ {
@ -125,7 +137,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
var timeDensity = "15"; var timeDensity = "15";
//获取缓存中的电表信息 //获取缓存中的电表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter); var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
@ -178,6 +190,44 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
/// <summary>
/// 测试Redis批量读取10万条数据性能
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestRedisCacheGetAllPagedData()
{
var timeDensity = "15";
string SystemType = "Energy";
string ServerTagName = "JiSheCollectBus2";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var timer1 = Stopwatch.StartNew();
decimal? cursor = null;
string member = null;
bool hasNext;
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
do
{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
}
public Task<SampleDto> GetAsync() public Task<SampleDto> GetAsync()
{ {

View File

@ -1,11 +1,13 @@
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
@ -13,13 +15,16 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using Mapster;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -32,6 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IIoTDBProvider _dbProvider; private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly ICapPublisher _producerBus; private readonly ICapPublisher _producerBus;
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
@ -39,6 +45,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ICapPublisher producerBus, ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository, IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService, IProducerService producerService,
IRedisDataCacheService redisDataCacheService,
IIoTDBProvider dbProvider) IIoTDBProvider dbProvider)
{ {
_producerBus = producerBus; _producerBus = producerBus;
@ -46,6 +53,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_dbProvider = dbProvider; _dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository; _meterReadingRecordRepository = meterReadingRecordRepository;
_producerService = producerService; _producerService = producerService;
_redisDataCacheService = redisDataCacheService;
} }
/// <summary> /// <summary>
@ -115,32 +123,45 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue; continue;
} }
//获取缓存中的表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
return;
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>(); var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
// 解析结果(结果为嵌套数组) var timer = Stopwatch.StartNew();
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
timer.Stop();
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return; return;
} }
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
var timer = Stopwatch.StartNew();
//处理数据 //处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync( //await DeviceGroupBalanceControl.ProcessGenericListAsync(
@ -157,14 +178,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: meterInfos,
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: data => processor: (data,groupIndex) =>
{ {
_ = AmmerterCreatePublishTask(timeDensity, data); _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
} }
); );
timer.Stop(); timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@ -207,25 +228,86 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
#if DEBUG #if DEBUG
//var timeDensity = "15";
//string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
////获取缓存中的电表信息
//var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
//var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
////List<string> focusAddressDataLista = new List<string>();
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
//foreach (var item in tempMeterInfos)
//{
// var tempData = item.Adapt<AmmeterInfo>();
// tempData.FocusId = item.FocusID;
// tempData.MeterId = item.Id;
// meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress);
//}
var timeDensity = "15"; var timeDensity = "15";
//获取缓存中的电表信息 var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
List<string> focusAddressDataLista = new List<string>(); List<string> focusAddressDataLista = new List<string>();
foreach (var item in meterInfos) var timer1 = Stopwatch.StartNew();
{ //decimal? cursor = null;
focusAddressDataLista.Add(item.FocusAddress); //string member = null;
} //bool hasNext;
//do
//{
// var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); // meterInfos.AddRange(page.Items);
return; // cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
var allIds = new HashSet<string>();
decimal? score = null;
string member = null;
while (true)
{
var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: score,
lastMember: member);
meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
foreach (var item in page.Items)
{
if (!allIds.Add(item.MemberId))
throw new Exception("Duplicate data found!");
}
if (!page.HasNext) break;
score = page.NextScore;
member = page.NextMember;
}
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//return;
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif #endif
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
@ -237,6 +319,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空"); throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
} }
var timer = Stopwatch.StartNew();
List<string> focusAddressDataList = new List<string>();//用于处理Kafka主题分区数据的分发和处理。 List<string> focusAddressDataList = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
@ -244,6 +327,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{ {
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
//将表计信息根据集中器分组,获得集中器号 //将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup) foreach (var item in meterInfoGroup)
@ -255,17 +343,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
focusAddressDataList.Add(item.Key); focusAddressDataList.Add(item.Key);
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG #if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题 //每次缓存时,删除缓存,避免缓存数据有不准确的问题
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey); //await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else #else
//每次缓存时,删除缓存,避免缓存数据有不准确的问题 //每次缓存时,删除缓存,避免缓存数据有不准确的问题
await FreeRedisProvider.Instance.DelAsync(redisCacheKey); //await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#endif #endif
Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>(); //Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>();
foreach (var ammeter in item) foreach (var ammeter in item)
{ {
//处理ItemCode //处理ItemCode
@ -310,11 +398,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
keyValuePairs.TryAdd($"{ammeter.ID}", ammeter); ammeterInfos.Add(ammeter);
//keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); //await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos);
//在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行所有的采集频率任务 //在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
@ -337,7 +431,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList); DeviceGroupBalanceControl.InitializeCache(focusAddressDataList);
} }
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成"); timer.Stop();
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
} }
/// <summary> /// <summary>
@ -608,9 +704,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary> /// </summary>
/// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param> /// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns> /// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity private async Task AmmerterCreatePublishTask(int timeDensity
, AmmeterInfo ammeterInfo) , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
{ {
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
@ -618,7 +716,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型 //构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
@ -699,7 +799,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>(); //Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes) foreach (var tempItem in tempCodes)
{ {
@ -754,17 +855,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterReadingRecords = new MeterReadingRecords() var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{ {
ProjectID = ammeterInfo.ProjectID, ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID, DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime, PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime, CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress, MeterAddress = ammeterInfo.AmmerterAddress,
MeterId = ammeterInfo.ID, MeterId = ammeterInfo.MeterId,
MeterType = MeterTypeEnum.Ammeter, MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress, FocusAddress = ammeterInfo.FocusAddress,
FocusID = ammeterInfo.FocusID, FocusId = ammeterInfo.FocusId,
AFN = aFN, AFN = aFN,
Fn = fn, Fn = fn,
ItemCode = tempItem, ItemCode = tempItem,
@ -774,9 +875,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos), IssuedMessageHexString = Convert.ToHexString(dataInfos),
}; };
meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); //meterReadingRecords.CreateDataId(GuidGenerator.Create());
taskList.Add(meterReadingRecords);
} }
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan); //await Task.Delay(timeSpan);
@ -784,14 +886,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//return keyValuePairs; //return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
using (var pipe = FreeRedisProvider.Instance.StartPipe()) //using (var pipe = FreeRedisProvider.Instance.StartPipe())
//{
// pipe.HSet(redisCacheKey, keyValuePairs);
// object[] ret = pipe.EndPipe();
//}
if (taskList == null
|| taskList.Count() <= 0
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{ {
pipe.HSet(redisCacheKey, keyValuePairs); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
object[] ret = pipe.EndPipe(); return;
} }
await _redisDataCacheService.BatchInsertDataAsync(
redisCacheTelemetryPacketInfoHashKey,
await Task.CompletedTask; redisCacheTelemetryPacketInfoSetIndexKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
taskList);
} }
/// <summary> /// <summary>
@ -881,7 +994,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var focusInfo in focusGroup) foreach (var focusInfo in focusGroup)
{ {
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型 //构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}"; var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
foreach (var ammeterInfo in focusInfo.Value) foreach (var ammeterInfo in focusInfo.Value)
{ {
@ -915,22 +1028,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeter.AreaCode)) if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信区号为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
continue; continue;
} }
if (string.IsNullOrWhiteSpace(ammeter.Address)) if (string.IsNullOrWhiteSpace(ammeter.Address))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
continue; continue;
} }
if (Convert.ToInt32(ammeter.Address) > 65535) if (Convert.ToInt32(ammeter.Address) > 65535)
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址无效,确保大于65535"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
continue; continue;
} }
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033) if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},非有效测量点号({ammeter.MeteringCode})"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
continue; continue;
} }
@ -1028,10 +1141,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = pendingCopyReadTime, PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime, CreationTime = currentTime,
MeterAddress = ammeter.AmmerterAddress, MeterAddress = ammeter.AmmerterAddress,
MeterId = ammeter.ID, MeterId = ammeter.MeterId,
MeterType = MeterTypeEnum.Ammeter, MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeter.FocusAddress, FocusAddress = ammeter.FocusAddress,
FocusID = ammeter.FocusID, FocusID = ammeter.FocusId,
AFN = aFN, AFN = aFN,
Fn = fn, Fn = fn,
ItemCode = tempItem, ItemCode = tempItem,
@ -1041,9 +1154,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos), IssuedMessageHexString = Convert.ToHexString(dataInfos),
}; };
meterReadingRecords.CreateDataId(GuidGenerator.Create()); //meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords); keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }
@ -1097,12 +1210,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}";
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>(); Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
foreach (var subItem in item) foreach (var subItem in item)
{ {
keyValuePairs.TryAdd($"{subItem.ID}", subItem); keyValuePairs.TryAdd($"{subItem.MeterId}", subItem);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }
@ -1245,7 +1358,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType) private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType)
{ {
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*"; return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*";
} }
#endregion #endregion

View File

@ -4,6 +4,7 @@ using System.Threading.Tasks;
using Confluent.Kafka; using Confluent.Kafka;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
@ -35,8 +36,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{ {
string serverTagName = string.Empty; string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, public EnergySystemScheduledMeterReadingService(
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IIoTDBProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordRepository,
IConfiguration configuration,
IProducerService producerService,
IRedisDataCacheService redisDataCacheService)
: base(logger,
producerBus,
meterReadingRecordRepository,
producerService,
redisDataCacheService,
dbProvider)
{ {
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!; serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
} }
@ -80,11 +92,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// Baudrate = 2400, // Baudrate = 2400,
// FocusAddress = "402440506", // FocusAddress = "402440506",
// Name = "张家祠工务(三相电表)", // Name = "张家祠工务(三相电表)",
// FocusID = 95780, // FocusId = 95780,
// DatabaseBusiID = 1, // DatabaseBusiID = 1,
// MeteringCode = 1, // MeteringCode = 1,
// AmmerterAddress = "402410040506", // AmmerterAddress = "402410040506",
// ID = 127035, // MeterId = 127035,
// TypeName = 3, // TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15, // TimeDensity = 15,
@ -94,11 +106,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// Baudrate = 2400, // Baudrate = 2400,
// FocusAddress = "542400504", // FocusAddress = "542400504",
// Name = "五号配(长芦二所四排)(单相电表)", // Name = "五号配(长芦二所四排)(单相电表)",
// FocusID = 69280, // FocusId = 69280,
// DatabaseBusiID = 1, // DatabaseBusiID = 1,
// MeteringCode = 2, // MeteringCode = 2,
// AmmerterAddress = "542410000504", // AmmerterAddress = "542410000504",
// ID = 95594, // MeterId = 95594,
// TypeName = 1, // TypeName = 1,
// DataTypes = "581,589,592,597,601", // DataTypes = "581,589,592,597,601",
// TimeDensity = 15, // TimeDensity = 15,
@ -106,7 +118,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//return ammeterInfos; //return ammeterInfos;
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
@ -133,9 +145,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public override async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890") public override async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
{ {
string sql = $@"SELECT string sql = $@"SELECT
A.ID, A.ID as MeterId,
A.Name, A.Name,
A.FocusID, A.FocusID as FocusId,
A.MeteringCode, A.MeteringCode,
A.Baudrate, A.Baudrate,
A.MeteringPort, A.MeteringPort,

View File

@ -28,25 +28,49 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary> /// </summary>
public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen"; public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen";
public const string MeterInfo = "MeterInfo";
/// <summary> /// <summary>
/// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary> /// </summary>
public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}:"; public const string CacheMeterInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}";
/// <summary>
/// 缓存表计信息索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary>
public const string CacheMeterInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:SetIndex:{"{3}"}";
/// <summary>
/// 缓存表计信息排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary>
public const string CacheMeterInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ZSetScoresIndex:{"{3}"}";
public const string TaskInfo = "TaskInfo";
/// <summary> /// <summary>
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary> /// </summary>
public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TaskInfo:{"{2}"}:{"{3}"}"; public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}:{"{3}"}";
public const string TelemetryPacket = "TelemetryPacket";
/// <summary>
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
/// </summary>
public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}:{"{4}"}:{"{5}"}";
/// <summary> /// <summary>
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率{4}=>集中器所在分组,{5}=>时间格式的任务批次
/// </summary> /// </summary>
public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:"; public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}:{"{4}"}:{"{5}"}";
/// <summary> /// <summary>
/// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记 /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
/// </summary> /// </summary>
public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap"; public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}:{"{4}"}:{"{5}"}";
///// <summary>
///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
///// </summary>
//public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey"; public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
} }

View File

@ -161,7 +161,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
MaxDegreeOfParallelism = maxThreads.Value, MaxDegreeOfParallelism = maxThreads.Value,
}; };
TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
await Task.Run(() => await Task.Run(() =>
{ {
Parallel.For(0, cache.CachedGroups.Length, options, async groupId => Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
@ -169,8 +168,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
var queue = groupQueues[groupId]; var queue = groupQueues[groupId];
while (queue.TryDequeue(out T item)) while (queue.TryDequeue(out T item))
{ {
await Task.Delay(timeSpan); processor(item, groupId);
processor(item, Thread.CurrentThread.ManagedThreadId);
} }
}); });
}); });
@ -183,14 +181,14 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
/// <typeparam name="T">已经分组的设备信息</typeparam> /// <typeparam name="T">已经分组的设备信息</typeparam>
/// <param name="items">部分或者全部的已经分组的设备集合</param> /// <param name="items">部分或者全部的已经分组的设备集合</param>
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param> /// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
/// <param name="processor">处理委托(参数:当前对象,线程ID</param> /// <param name="processor">处理委托(参数:当前对象,分组ID</param>
/// <param name="maxConcurrency">可选最佳并发度</param> /// <param name="maxConcurrency">可选最佳并发度</param>
/// <returns></returns> /// <returns></returns>
/// <exception cref="InvalidOperationException"></exception> /// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessWithThrottleAsync<T>( public static async Task ProcessWithThrottleAsync<T>(
List<T> items, List<T> items,
Func<T, string> deviceIdSelector, Func<T, string> deviceIdSelector,
Action<T> processor, Action<T,int> processor,
int? maxConcurrency = null) int? maxConcurrency = null)
{ {
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
@ -244,7 +242,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
/// <summary> /// <summary>
/// 分组异步处理(带节流) /// 分组异步处理(带节流)
/// </summary> /// </summary>
private static async Task ProcessItemAsync<T>(T item, Action<T> processor, int groupId) private static async Task ProcessItemAsync<T>(T item, Action<T,int> processor, int groupId)
{ {
// 使用内存缓存降低CPU负载 // 使用内存缓存降低CPU负载
await Task.Yield(); // 立即释放当前线程 await Task.Yield(); // 立即释放当前线程
@ -255,7 +253,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
{ {
ExecutionContext.Run(context!, state => ExecutionContext.Run(context!, state =>
{ {
processor(item); processor(item,groupId);
}, null); }, null);
}); });
} }

View File

@ -89,5 +89,26 @@ namespace JiShe.CollectBus.Common.Extensions
if (buffer.Count > 0) if (buffer.Count > 0)
yield return buffer; yield return buffer;
} }
//public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
//{
// if (batchSize <= 0)
// throw new ArgumentOutOfRangeException(nameof(batchSize));
// using var enumerator = source.GetEnumerator();
// while (enumerator.MoveNext())
// {
// yield return GetBatch(enumerator, batchSize);
// }
//}
//private static IEnumerable<T> GetBatch<T>(IEnumerator<T> enumerator, int batchSize)
//{
// do
// {
// yield return enumerator.Current;
// batchSize--;
// } while (batchSize > 0 && enumerator.MoveNext());
//}
} }
} }

View File

@ -17,6 +17,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0" /> <PackageReference Include="JetBrains.Annotations" Version="2024.2.0" />
<PackageReference Include="Mapster" Version="7.4.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />

View File

@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Models
{
/// <summary>
/// 缓存全局分页结果
/// </summary>
/// <typeparam name="T"></typeparam>
public class BusCacheGlobalPagedResult<T>
{
/// <summary>
/// 数据集合
/// </summary>
public List<T> Items { get; set; }
/// <summary>
/// 总条数
/// </summary>
public long TotalCount { get; set; }
/// <summary>
/// 每页条数
/// </summary>
public int PageSize { get; set; }
/// <summary>
/// 总页数
/// </summary>
public int PageCount
{
get
{
return (int)Math.Ceiling((double)TotalCount / PageSize);
}
}
/// <summary>
/// 是否有下一页
/// </summary>
public bool HasNext { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public decimal? NextScore { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public string NextMember { get; set; }
}
}

View File

@ -1,31 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Models
{
public class GlobalPagedResult<T>
{
/// <summary>
/// 数据集合
/// </summary>
public List<T> Items { get; set; }
/// <summary>
/// 是否有下一页
/// </summary>
public bool HasNext { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public long? NextScore { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public string NextMember { get; set; }
}
}

View File

@ -31,5 +31,20 @@ namespace JiShe.CollectBus.Common.Models
/// 数据集合 /// 数据集合
/// </summary> /// </summary>
public IEnumerable<T> Items { get; set; } public IEnumerable<T> Items { get; set; }
/// <summary>
/// 是否有下一页
/// </summary>
public bool HasNext { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public decimal? NextScore { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public string NextMember { get; set; }
} }
} }

View File

@ -9,7 +9,7 @@ namespace JiShe.CollectBus.Common.Models
/// <summary> /// <summary>
/// 设备缓存基础模型 /// 设备缓存基础模型
/// </summary> /// </summary>
public class DeviceCacheBasicModel public abstract class DeviceCacheBasicModel
{ {
/// <summary> /// <summary>
/// 集中器Id /// 集中器Id
@ -20,5 +20,15 @@ namespace JiShe.CollectBus.Common.Models
/// 表Id /// 表Id
/// </summary> /// </summary>
public int MeterId { get; set; } public int MeterId { get; set; }
/// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary>
public virtual string MemberId => $"{FocusId}:{MeterId}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId;
} }
} }

View File

@ -1,4 +1,6 @@
using System; using FreeSql.DataAnnotations;
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -6,23 +8,25 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Ammeters namespace JiShe.CollectBus.Ammeters
{ {
public class AmmeterInfo public class AmmeterInfo: DeviceCacheBasicModel
{ {
/// <summary> /// <summary>
/// 电表ID /// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary> /// </summary>
public int ID { get; set; } [Column(IsIgnore = true)]
public override string MemberId => $"{FocusId}:{MeterId}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
[Column(IsIgnore = true)]
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
/// <summary> /// <summary>
/// 电表名称 /// 电表名称
/// </summary> /// </summary>
public string Name { get; set; } public string Name { get; set; }
/// <summary>
/// 集中器ID
/// </summary>
public int FocusID { get; set; }
/// <summary> /// <summary>
/// 集中器地址 /// 集中器地址
/// </summary> /// </summary>

View File

@ -0,0 +1,152 @@
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Ammeters
{
public class AmmeterInfoTemp
{
/// <summary>
/// 集中器Id
/// </summary>
public int FocusID { get; set; }
/// <summary>
/// 表Id
/// </summary>
public int Id { get; set; }
/// <summary>
/// 电表名称
/// </summary>
public string Name { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string Address { get; set; }
/// <summary>
/// 集中器区域代码
/// </summary>
public string AreaCode { get; set; }
/// <summary>
/// 电表类别 1单相、2三相三线、3三相四线,
/// 07协议 开合闸指令(1A开闸断电,1C单相表合闸,1B多相表合闸) 645 2007 表
/// 97协议//true(合闸);false(跳闸) 545 1997 没有单相多相 之分 "true" ? "9966" : "3355"
/// </summary>
public int TypeName { get; set; }
/// <summary>
/// 跳合闸状态字段: 0 合闸1 跳闸
/// 电表TripState 0 合闸-通电, 1 断开、跳闸);
/// </summary>
public int TripState { get; set; }
/// <summary>
/// 规约 -电表default(30) 197协议3007协议
/// </summary>
public int? Protocol { get; set; }
/// <summary>
/// 一个集中器下的[MeteringCode]必须唯一。 PN
/// </summary>
public int MeteringCode { get; set; }
/// <summary>
/// 电表通信地址
/// </summary>
public string AmmerterAddress { get; set; }
/// <summary>
/// 波特率 default(2400)
/// </summary>
public int Baudrate { get; set; }
/// <summary>
/// MeteringPort 端口就几个可以枚举。
/// </summary>
public int MeteringPort { get; set; }
/// <summary>
/// 电表密码
/// </summary>
public string Password { get; set; }
/// <summary>
/// 采集时间间隔(分钟如15)
/// </summary>
public int TimeDensity { get; set; }
/// <summary>
/// 该电表方案下采集项JSON格式["0D_80","0D_80"]
/// </summary>
public string ItemCodes { get; set; }
/// <summary>
/// State表状态:
/// 0新装未下发1运行(档案下发成功时设置状态值1) 2暂停, 100销表销表后是否重新启用
/// 特定State -1 已删除
/// </summary>
public int State { get; set; }
/// <summary>
/// 是否自动采集(0:主动采集1自动采集)
/// </summary>
public int AutomaticReport { get; set; }
/// <summary>
/// 该电表方案下采集项编号
/// </summary>
public string DataTypes { get; set; }
/// <summary>
/// 品牌型号
/// </summary>
public string BrandType { get; set; }
/// <summary>
/// 采集器编号
/// </summary>
public string GatherCode { get; set; }
/// <summary>
/// 是否特殊表1是特殊电表
/// </summary>
public int Special { get; set; }
/// <summary>
/// 费率类型,单、多 (SingleRate :单费率单相表1多费率其他0 与TypeName字段无关)
/// SingleRate ? "单" : "复"
/// [SingleRate] --0 复费率 false 1 单费率 true 与PayPlanID保持一致
///对应 TB_PayPlan.Type: 1复费率2单费率
/// </summary>
public bool SingleRate { get; set; }
/// <summary>
/// 项目ID
/// </summary>
public int ProjectID { get; set; }
/// <summary>
/// 数据库业务ID
/// </summary>
public int DatabaseBusiID { get; set; }
/// <summary>
/// 是否异常集中器 0:正常1异常
/// </summary>
public int AbnormalState { get; set; }
public DateTime LastTime { get; set; }
}
}

View File

@ -0,0 +1,141 @@
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Entities.Auditing;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 抄读任务Redis缓存数据记录
/// </summary>
public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel
{
/// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary>
public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
/// <summary>
/// 是否手动操作
/// </summary>
public bool ManualOrNot { get; set; }
/// <summary>
/// 任务数据唯一标记
/// </summary>
public string TaskMark { get; set; }
/// <summary>
/// 时间戳标记IoTDB时间列处理上报通过构建标记获取唯一标记匹配时间戳。
/// </summary>
public long Timestamps { get; set; }
/// <summary>
/// 是否超时
/// </summary>
public bool IsTimeout { get; set; } = false;
/// <summary>
/// 待抄读时间
/// </summary>
public DateTime PendingCopyReadTime { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set; }
/// <summary>
/// 表地址
/// </summary>
public string MeterAddress { get; set; }
/// <summary>
/// 表类型
/// </summary>
public MeterTypeEnum MeterType { get; set; }
/// <summary>
/// 项目ID
/// </summary>
public int ProjectID { get; set; }
/// <summary>
/// 数据库业务ID
/// </summary>
public int DatabaseBusiID { get; set; }
/// <summary>
/// AFN功能码
/// </summary>
public AFN AFN { get; set; }
/// <summary>
/// 抄读功能码
/// </summary>
public int Fn { get; set; }
/// <summary>
/// 抄读计量点
/// </summary>
public int Pn { get; set; }
/// <summary>
/// 采集项编码
/// </summary>
public string ItemCode { get; set;}
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreationTime { get; set; }
/// <summary>
/// 下发消息内容
/// </summary>
public string IssuedMessageHexString { get; set; }
/// <summary>
/// 下发消息Id
/// </summary>
public string IssuedMessageId { get; set; }
/// <summary>
/// 消息上报内容
/// </summary>
public string? ReceivedMessageHexString { get; set; }
/// <summary>
/// 消息上报时间
/// </summary>
public DateTime? ReceivedTime { get; set; }
/// <summary>
/// 上报消息Id
/// </summary>
public string ReceivedMessageId { get; set; }
/// <summary>
/// 上报报文解析备注,异常情况下才有
/// </summary>
public string ReceivedRemark { get; set; }
//public void CreateDataId(Guid Id)
//{
// this.Id = Id;
//}
}
}

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -10,13 +11,8 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// <summary> /// <summary>
/// 水表信息 /// 水表信息
/// </summary> /// </summary>
public class WatermeterInfo public class WatermeterInfo: DeviceCacheBasicModel
{ {
/// <summary>
/// 水表ID
/// </summary>
public int ID { get; set; }
/// <summary> /// <summary>
/// 水表名称 /// 水表名称
/// </summary> /// </summary>
@ -25,12 +21,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 表密码 /// 表密码
/// </summary> /// </summary>
public string Password { get; set; } public string Password { get; set; }
/// <summary>
/// 集中器ID
/// </summary>
public int FocusID { get; set; }
/// <summary> /// <summary>
/// 集中器地址 /// 集中器地址
/// </summary> /// </summary>

View File

@ -1,11 +1,14 @@
using FreeRedis; using FreeRedis;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.FreeRedisProvider.Options; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedisProvider.Options;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System.Diagnostics; using System.Diagnostics;
using System.Text.Json; using System.Text.Json;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Collections.Concurrent;
namespace JiShe.CollectBus.FreeRedisProvider namespace JiShe.CollectBus.FreeRedisProvider
{ {
@ -33,450 +36,472 @@ namespace JiShe.CollectBus.FreeRedisProvider
public IRedisClient GetInstance() public IRedisClient GetInstance()
{ {
var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}"; var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB},MaxPoolSize={_option.MaxPoolSize}";
Instance = new RedisClient(connectionString); Instance = new RedisClient(connectionString);
Instance.Serialize = obj => BusJsonSerializer.Serialize(obj); Instance.Serialize = obj => BusJsonSerializer.Serialize(obj);
Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type); Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type);
Instance.Notice += (s, e) => Trace.WriteLine(e.Log); Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance; return Instance;
} }
///// <summary>
//public async Task AddMeterZSetCacheData<T>(string redisCacheKey, string redisCacheIndexKey, decimal score, T data) ///// 单个添加数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="data">表计信息</param>
///// <param name="timestamp">可选时间戳</param>
///// <returns></returns>
//public async Task AddMeterCacheData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T data,
//DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
//{ //{
// if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) // // 参数校验增强
// if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// { // {
// throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101"); // throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
// } // }
// // 生成唯一member标识 // // 计算组合score分类ID + 时间戳)
// var member = data.Serialize(); // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
// // 计算score范围 // long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
// decimal dataScore = (long)score << 32;
// //// 事务操作 // //全局索引写入
// //using (var tran = FreeRedisProvider.Instance.Multi()) // long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// //{
// // await tran.ZAddAsync(cacheKey, score,member);
// // await tran.SAddAsync($"cat_index:{categoryId}", member);
// // object[] ret = tran.Exec();
// //}
// using (var pipe = Instance.StartPipe()) // // 使用事务保证原子性
// using (var trans = Instance.Multi())
// { // {
// pipe.ZAdd(redisCacheKey, dataScore, member); // // 主数据存储Hash
// pipe.SAdd(redisCacheIndexKey, member); // trans.HSet(redisCacheKey, data.MemberID, data.Serialize());
// object[] ret = pipe.EndPipe();
// // 分类索引
// trans.SAdd(redisCacheFocusIndexKey, data.MemberID);
// // 排序索引使用ZSET
// trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID);
// //全局索引
// trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID);
// var results = trans.Exec();
// if (results == null || results.Length <= 0)
// throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
// } // }
// await Task.CompletedTask; // await Task.CompletedTask;
//} //}
//public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>( ///// <summary>
///// 批量添加数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="items">数据集合</param>
///// <param name="timestamp">可选时间戳</param>
///// <returns></returns>
//public async Task BatchAddMeterData<T>(
//string redisCacheKey, //string redisCacheKey,
//string redisCacheIndexKey, //string redisCacheFocusIndexKey,
//decimal score, //string redisCacheScoresIndexKey,
//int pageSize = 10, //string redisCacheGlobalIndexKey,
//int pageIndex = 1) //IEnumerable<T> items,
//DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
//{ //{
// if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) // if (items == null
// || items.Count() <=0
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// { // {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101"); // throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101");
// } // }
// // 计算score范围 // const int BATCH_SIZE = 1000; // 每批1000条
// decimal minScore = (long)score << 32; // var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
// decimal maxScore = ((long)score + 1) << 32;
// // 分页参数 // foreach (var batch in items.Batch(BATCH_SIZE))
// {
// await semaphore.WaitAsync();
// _ = Task.Run(() =>
// {
// using (var pipe = Instance.StartPipe())
// {
// foreach (var item in batch)
// {
// // 计算组合score分类ID + 时间戳)
// var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
// long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
// //全局索引写入
// long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// // 主数据存储Hash
// pipe.HSet(redisCacheKey, item.MemberID, item.Serialize());
// // 分类索引Set
// pipe.SAdd(redisCacheFocusIndexKey, item.MemberID);
// // 排序索引使用ZSET
// pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID);
// //全局索引
// pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID);
// }
// pipe.EndPipe();
// }
// semaphore.Release();
// });
// }
// await Task.CompletedTask;
//}
///// <summary>
///// 删除指定redis缓存key的缓存数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="data">表计信息</param>
///// <returns></returns>
//public async Task RemoveMeterData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T data) where T : DeviceCacheBasicModel
//{
// if (data == null
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101");
// }
// const string luaScript = @"
// local mainKey = KEYS[1]
// local focusIndexKey = KEYS[2]
// local scoresIndexKey = KEYS[3]
// local globalIndexKey = KEYS[4]
// local member = ARGV[1]
// local deleted = 0
// if redis.call('HDEL', mainKey, member) > 0 then
// deleted = 1
// end
// redis.call('SREM', focusIndexKey, member)
// redis.call('ZREM', scoresIndexKey, member)
// redis.call('ZREM', globalIndexKey, member)
// return deleted
// ";
// var keys = new[]
// {
// redisCacheKey,
// redisCacheFocusIndexKey,
// redisCacheScoresIndexKey,
// redisCacheGlobalIndexKey
// };
// var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
// if ((int)result == 0)
// throw new KeyNotFoundException("指定数据不存在");
//}
///// <summary>
///// 修改表计缓存信息
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="oldRedisCacheFocusIndexKey">旧集中器索引Set缓存Key</param>
///// <param name="newRedisCacheFocusIndexKey">新集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="newData">表计信息</param>
///// <param name="newTimestamp">可选时间戳</param>
///// <returns></returns>
//public async Task UpdateMeterData<T>(
//string redisCacheKey,
//string oldRedisCacheFocusIndexKey,
//string newRedisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T newData,
//DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel
//{
// if (newData == null
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101");
// }
// var luaScript = @"
// local mainKey = KEYS[1]
// local oldFocusIndexKey = KEYS[2]
// local newFocusIndexKey = KEYS[3]
// local scoresIndexKey = KEYS[4]
// local globalIndexKey = KEYS[5]
// local member = ARGV[1]
// local newData = ARGV[2]
// local newScore = ARGV[3]
// local newGlobalScore = ARGV[4]
// -- 校验存在性
// if redis.call('HEXISTS', mainKey, member) == 0 then
// return 0
// end
// -- 更新主数据
// redis.call('HSET', mainKey, member, newData)
// -- 处理变更
// if newScore ~= '' then
// -- 删除旧索引
// redis.call('SREM', oldFocusIndexKey, member)
// redis.call('ZREM', scoresIndexKey, member)
// -- 添加新索引
// redis.call('SADD', newFocusIndexKey, member)
// redis.call('ZADD', scoresIndexKey, newScore, member)
// end
// -- 更新全局索引
// if newGlobalScore ~= '' then
// -- 删除旧索引
// redis.call('ZREM', globalIndexKey, member)
// -- 添加新索引
// redis.call('ZADD', globalIndexKey, newGlobalScore, member)
// end
// return 1
// ";
// var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
// var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds();
// var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks;
// var result = await Instance.EvalAsync(luaScript,
// new[]
// {
// redisCacheKey,
// oldRedisCacheFocusIndexKey,
// newRedisCacheFocusIndexKey,
// redisCacheScoresIndexKey,
// redisCacheGlobalIndexKey
// },
// new object[]
// {
// newData.MemberID,
// newData.Serialize(),
// newScoreValue.ToString() ?? "",
// newGlobalScore.ToString() ?? ""
// });
// if ((int)result == 0)
// {
// throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在");
// }
//}
//public async Task<BusPagedResult<T>> SingleGetMeterPagedData<T>(
//string redisCacheKey,
//string redisCacheScoresIndexKey,
//int focusId,
//int pageSize = 10,
//int pageIndex = 1,
//bool descending = true)
//{
// // 计算score范围
// long minScore = (long)focusId << 32;
// long maxScore = ((long)focusId + 1) << 32;
// // 分页参数计算
// int start = (pageIndex - 1) * pageSize; // int start = (pageIndex - 1) * pageSize;
// // 查询主数据 // // 获取排序后的member列表
// var members = await Instance.ZRevRangeByScoreAsync( // var members = descending
// redisCacheKey, // ? await Instance.ZRevRangeByScoreAsync(
// maxScore, // redisCacheScoresIndexKey,
// maxScore,
// minScore,
// start,
// pageSize)
// : await Instance.ZRangeByScoreAsync(
// redisCacheScoresIndexKey,
// minScore,
// maxScore,
// start,
// pageSize);
// // 批量获取实际数据
// var dataTasks = members.Select(m =>
// Instance.HGetAsync<T>(redisCacheKey, m)).ToArray();
// await Task.WhenAll(dataTasks);
// // 总数统计优化
// var total = await Instance.ZCountAsync(
// redisCacheScoresIndexKey,
// minScore, // minScore,
// offset: start, // maxScore);
// count: pageSize
// );
// if (members == null)
// {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102");
// }
// // 查询总数
// var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore);
// return new BusPagedResult<T> // return new BusPagedResult<T>
// { // {
// Items = members.Select(m => // Items = dataTasks.Select(t => t.Result).ToList(),
// BusJsonSerializer.Deserialize<T>(m)!).ToList(),
// TotalCount = total, // TotalCount = total,
// PageIndex = pageIndex, // PageIndex = pageIndex,
// PageSize = pageSize // PageSize = pageSize
// }; // };
//} //}
///// <summary>
///// 删除数据示例 //public async Task<BusPagedResult<T>> GetFocusPagedData<T>(
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">分类</param>
///// <param name="redisCacheIndexKey"></param>
///// <param name="data"></param>
///// <returns></returns>
//public async Task RemoveMeterZSetData<T>(
//string redisCacheKey, //string redisCacheKey,
//string redisCacheIndexKey, //string redisCacheScoresIndexKey,
//T data) //int focusId,
//int pageSize = 10,
//long? lastScore = null,
//string lastMember = null,
//bool descending = true) where T : DeviceCacheBasicModel
//{ //{
// // 计算分数范围
// long minScore = (long)focusId << 32;
// long maxScore = ((long)focusId + 1) << 32;
// // 查询需要删除的member // // 获取成员列表
// var members = await Instance.SMembersAsync(redisCacheIndexKey); // var members = await GetSortedMembers(
// var target = members.FirstOrDefault(m => // redisCacheScoresIndexKey,
// BusJsonSerializer.Deserialize<T>(m) == data);//泛型此处该如何处理? // minScore,
// maxScore,
// pageSize,
// lastScore,
// lastMember,
// descending);
// if (target != null) // // 批量获取数据
// var dataDict = await Instance.HMGetAsync<T>(redisCacheKey, members.CurrentItems);
// return new BusPagedResult<T>
// { // {
// using (var trans = Instance.Multi()) // Items = dataDict,
// { // TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore),
// trans.ZRem(redisCacheKey, target); // HasNext = members.HasNext,
// trans.SRem(redisCacheIndexKey, target); // NextScore = members.NextScore,
// trans.Exec(); // NextMember = members.NextMember
// } // };
// } //}
// await Task.CompletedTask; //private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)>
// GetSortedMembers(
// string zsetKey,
// long minScore,
// long maxScore,
// int pageSize,
// long? lastScore,
// string lastMember,
// bool descending)
//{
// var querySize = pageSize + 1;
// var (startScore, exclude) = descending
// ? (lastScore ?? maxScore, lastMember)
// : (lastScore ?? minScore, lastMember);
// var members = descending
// ? await Instance.ZRevRangeByScoreAsync(
// zsetKey,
// max: startScore,
// min: minScore,
// offset: 0,
// count: querySize)
// : await Instance.ZRangeByScoreAsync(
// zsetKey,
// min: startScore,
// max: maxScore,
// offset: 0,
// count: querySize);
// var hasNext = members.Length > pageSize;
// var currentItems = members.Take(pageSize).ToArray();
// var nextCursor = currentItems.Any()
// ? await GetNextCursor(zsetKey, currentItems.Last(), descending)
// : (null, null);
// return (currentItems, hasNext, nextCursor.score, nextCursor.member);
//}
//private async Task<long> GetTotalCount(string zsetKey, long min, long max)
//{
// // 缓存计数优化
// var cacheKey = $"{zsetKey}_count_{min}_{max}";
// var cached = await Instance.GetAsync<long?>(cacheKey);
// if (cached.HasValue)
// return cached.Value;
// var count = await Instance.ZCountAsync(zsetKey, min, max);
// await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
// return count;
//} //}
public async Task AddMeterZSetCacheData<T>( //public async Task<Dictionary<int, BusPagedResult<T>>> BatchGetMeterPagedData<T>(
string redisCacheKey, //string redisCacheKey,
string redisCacheIndexKey, //string redisCacheScoresIndexKey,
int categoryId, // 新增分类ID参数 //IEnumerable<int> focusIds,
T data, //int pageSizePerFocus = 10) where T : DeviceCacheBasicModel
DateTimeOffset? timestamp = null) //{
{ // var results = new ConcurrentDictionary<int, BusPagedResult<T>>();
// 参数校验增强 // var parallelOptions = new ParallelOptions
if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) // {
|| string.IsNullOrWhiteSpace(redisCacheIndexKey)) // MaxDegreeOfParallelism = Environment.ProcessorCount * 2
{ // };
throw new ArgumentException("Invalid parameters");
}
// 生成唯一member标识带数据指纹 // await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) =>
var member = $"{categoryId}:{Guid.NewGuid()}"; // {
var serializedData = data.Serialize(); // var data = await SingleGetMeterPagedData<T>(
// redisCacheKey,
// redisCacheScoresIndexKey,
// focusId,
// pageSizePerFocus);
// 计算组合score分类ID + 时间戳) // results.TryAdd(focusId, data);
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; // });
long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks; // return new Dictionary<int, BusPagedResult<T>>(results);
//}
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// 主数据存储Hash
trans.HSet(redisCacheKey, member, serializedData);
// 排序索引使用ZSET
trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member);
// 分类索引
trans.SAdd(redisCacheIndexKey, member);
//全局索引
trans.ZAdd("global_data_all", globalScore, member);
var results = trans.Exec();
if (results == null || results.Length <= 0)
throw new Exception("Transaction failed");
}
await Task.CompletedTask;
}
public async Task BatchAddMeterData<T>(
string redisCacheKey,
string indexKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel
{
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
//foreach (var batch in items.Batch(BATCH_SIZE))
//{
// await semaphore.WaitAsync();
// _ = Task.Run(async () =>
// {
// using (var pipe = FreeRedisProvider.Instance.StartPipe())
// {
// foreach (var item in batch)
// {
// var member = $"{item.CategoryId}:{Guid.NewGuid()}";
// long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks;
// // Hash主数据
// pipe.HSet(redisCacheKey, member, item.Data.Serialize());
// // 分类索引
// pipe.ZAdd($"{redisCacheKey}_scores", score, member);
// // 全局索引
// pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member);
// // 分类快速索引
// pipe.SAdd(indexKey, member);
// }
// pipe.EndPipe();
// }
// semaphore.Release();
// });
//}
await Task.CompletedTask;
}
public async Task UpdateMeterData<T>(
string redisCacheKey,
string oldCategoryIndexKey,
string newCategoryIndexKey,
string memberId, // 唯一标识(格式:"分类ID:GUID"
T newData,
int? newCategoryId = null,
DateTimeOffset? newTimestamp = null)
{
// 参数校验
if (string.IsNullOrWhiteSpace(memberId))
throw new ArgumentException("Invalid member ID");
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local oldIndex = KEYS[3]
local newIndex = KEYS[4]
local member = ARGV[1]
local newData = ARGV[2]
local newScore = ARGV[3]
--
if redis.call('HEXISTS', mainKey, member) == 0 then
return 0
end
--
redis.call('HSET', mainKey, member, newData)
--
if newScore ~= '' then
--
redis.call('SREM', oldIndex, member)
--
redis.call('ZADD', scoreKey, newScore, member)
--
redis.call('SADD', newIndex, member)
end
return 1
";
// 计算新score当分类或时间变化时
long? newScoreValue = null;
if (newCategoryId.HasValue || newTimestamp.HasValue)
{
var parts = memberId.Split(':');
var oldCategoryId = int.Parse(parts[0]);
var actualCategoryId = newCategoryId ?? oldCategoryId;
var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks;
}
var result = await Instance.EvalAsync(luaScript,
new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
oldCategoryIndexKey,
newCategoryIndexKey
},
new[]
{
memberId,
newData.Serialize(),
newScoreValue?.ToString() ?? ""
});
// 如果时间戳变化则更新全局索引
if (newTimestamp.HasValue)
{
long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds();
await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId);
}
if ((int)result == 0)
throw new KeyNotFoundException("指定数据不存在");
}
public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>(
string redisCacheKey,
string redisCacheIndexKey,
int categoryId,
int pageSize = 10,
int pageIndex = 1,
bool descending = true)
{
// 计算score范围
long minScore = (long)categoryId << 32;
long maxScore = ((long)categoryId + 1) << 32;
// 分页参数计算
int start = (pageIndex - 1) * pageSize;
// 获取排序后的member列表
var members = descending
? await Instance.ZRevRangeByScoreAsync(
$"{redisCacheKey}_scores",
maxScore,
minScore,
start,
pageSize)
: await Instance.ZRangeByScoreAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore,
start,
pageSize);
// 批量获取实际数据
var dataTasks = members.Select(m =>
Instance.HGetAsync<T>(redisCacheKey, m)).ToArray();
await Task.WhenAll(dataTasks);
// 总数统计优化
var total = await Instance.ZCountAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore);
return new BusPagedResult<T>
{
Items = dataTasks.Select(t => t.Result).ToList(),
TotalCount = total,
PageIndex = pageIndex,
PageSize = pageSize
};
}
public async Task RemoveMeterZSetData<T>(
string redisCacheKey,
string redisCacheIndexKey,
string uniqueId) // 改为基于唯一标识删除
{
// 原子操作
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local indexKey = KEYS[3]
local member = ARGV[1]
redis.call('HDEL', mainKey, member)
redis.call('ZREM', scoreKey, member)
redis.call('SREM', indexKey, member)
return 1
";
var keys = new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
redisCacheIndexKey
};
var result = await Instance.EvalAsync(luaScript,
keys,
new[] { uniqueId });
if ((int)result != 1)
throw new Exception("删除操作失败");
}
public async Task<GlobalPagedResult<T>> GetGlobalPagedData<T>(
string redisCacheKey,
int pageSize = 10,
long? lastScore = null,
string lastMember = null,
bool descending = true)
{
const string zsetKey = "global_data_all";
// 分页参数处理
var (startScore, excludeMember) = descending
? (lastScore ?? long.MaxValue, lastMember)
: (lastScore ?? 0, lastMember);
// 获取成员列表
string[] members;
if (descending)
{
members = await Instance.ZRevRangeByScoreAsync(
zsetKey,
max: startScore,
min: 0,
offset: 0,
count: pageSize + 1);
}
else
{
members = await Instance.ZRangeByScoreAsync(
zsetKey,
min: startScore,
max: long.MaxValue,
offset: 0,
count: pageSize + 1);
}
// 处理分页结果
bool hasNext = members.Length > pageSize;
var actualMembers = members.Take(pageSize).ToArray();
// 批量获取数据(优化版本)
var dataTasks = actualMembers
.Select(m => Instance.HGetAsync<T>(redisCacheKey, m))
.ToArray();
await Task.WhenAll(dataTasks);
// 获取下一页游标
(long? nextScore, string nextMember) = actualMembers.Any()
? await GetNextCursor(zsetKey, actualMembers.Last(), descending)
: (null, null);
return new GlobalPagedResult<T>
{
Items = dataTasks.Select(t => t.Result).ToList(),
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember
};
}
private async Task<(long? score, string member)> GetNextCursor(
string zsetKey,
string lastMember,
bool descending)
{
var score = await Instance.ZScoreAsync(zsetKey, lastMember);
return (score.HasValue ? (long)score.Value : null, lastMember);
}
} }
} }

View File

@ -1,4 +1,5 @@
using FreeRedis; using FreeRedis;
using JiShe.CollectBus.Common.Models;
namespace JiShe.CollectBus.FreeRedisProvider namespace JiShe.CollectBus.FreeRedisProvider
{ {
@ -8,7 +9,7 @@ namespace JiShe.CollectBus.FreeRedisProvider
/// 获取客户端 /// 获取客户端
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
RedisClient Instance { get; set; } RedisClient Instance { get; set; }
} }
} }

View File

@ -13,6 +13,11 @@ namespace JiShe.CollectBus.FreeRedisProvider.Options
/// </summary> /// </summary>
public string? Configuration { get; set; } public string? Configuration { get; set; }
/// <summary>
/// 最大连接数
/// </summary>
public string? MaxPoolSize { get; set; }
/// <summary> /// <summary>
/// 默认数据库 /// 默认数据库
/// </summary> /// </summary>

View File

@ -41,6 +41,7 @@
}, },
"Redis": { "Redis": {
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"MaxPoolSize": "50",
"DefaultDB": "14", "DefaultDB": "14",
"HangfireDB": "15" "HangfireDB": "15"
}, },
@ -129,7 +130,7 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus", "ServerTagName": "JiSheCollectBus3",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"Cassandra": { "Cassandra": {

View File

@ -11,8 +11,6 @@ using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit; using MassTransit;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Common.Helpers;
using DotNetCore.CAP; using DotNetCore.CAP;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts namespace JiShe.CollectBus.Protocol.Contracts.Abstracts

View File

@ -7,7 +7,6 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" /> <PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" /> <PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />