dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
18 changed files with 808 additions and 122 deletions
Showing only changes of commit 033fde66b1 - Show all commits

View File

@ -1,15 +1,20 @@
using FreeRedis; using Confluent.Kafka;
using FreeRedis;
using FreeSql; using FreeSql;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.FreeRedisProvider;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Localization; using JiShe.CollectBus.Localization;
using JiShe.CollectBus.Serializer; using JiShe.CollectBus.Serializer;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
@ -21,6 +26,7 @@ public abstract class CollectBusAppService : ApplicationService
public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService<IFreeSqlProvider>(); public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService<IFreeSqlProvider>();
protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService<IFreeRedisProvider>()!; protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService<IFreeRedisProvider>()!;
protected CollectBusAppService() protected CollectBusAppService()
{ {
LocalizationResource = typeof(CollectBusResource); LocalizationResource = typeof(CollectBusResource);

View File

@ -14,6 +14,7 @@ using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Serializer; using JiShe.CollectBus.Serializer;
@ -36,18 +37,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly ICapPublisher _producerBus; private readonly ICapPublisher _producerBus;
private readonly IIoTDBProvider _dbProvider; private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService<Null, MeterReadingRecords> _producerService;
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger, ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher producerBus, ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository, IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService<Null, MeterReadingRecords> producerService,
IIoTDBProvider dbProvider) IIoTDBProvider dbProvider)
{ {
_producerBus = producerBus; _producerBus = producerBus;
_logger = logger; _logger = logger;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository; _meterReadingRecordRepository = meterReadingRecordRepository;
_producerService = producerService;
} }
/// <summary> /// <summary>
@ -145,37 +149,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
//处理数据 //处理数据
List<Tuple<string, Dictionary<string, MeterReadingRecords>>> tempDatas = new List<Tuple<string, Dictionary<string, MeterReadingRecords>>>(); //await DeviceGroupBalanceControl.ProcessGenericListAsync(
await DeviceGroupBalanceControl.ProcessGenericListAsync( // items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, threadId) =>
// {
// _ = AmmerterCreatePublishTask(timeDensity, data);
// }
//);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: meterInfos,
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: (data, threadId) => processor: data =>
{ {
_ = AmmerterCreatePublishTask(timeDensity, data); _ = AmmerterCreatePublishTask(timeDensity, data);
//var keyValuePairs = AmmerterCreatePublishTask(timeDensity, data);
//if (keyValuePairs != null && keyValuePairs.Keys.Count() > 0)
//{
// //构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
// var redisDataCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{keyValuePairs.First().Value.FocusAddress}";
// tempDatas.Add(Tuple.Create(redisDataCacheKey, keyValuePairs));
// //tempDatas.Add(keyValuePairs);
//}
} }
); );
//_logger.LogError("数据处理完成。");
//using (var pipe = FreeRedisProvider.Instance.StartPipe())
//{
// _logger.LogError("开始进入管道处理。");
// foreach (var dataItem in tempDatas)
// {
// pipe.HSet(dataItem.Item1, dataItem.Item2);
// }
// object[] ret = pipe.EndPipe();
//}
//_logger.LogError("管道处理完成。");
timer.Stop(); timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}");
@ -534,11 +527,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
} }
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
stopwatch.Stop(); stopwatch.Stop();
@ -635,20 +623,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return; return;
} }
//载波的不处理 //载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return; return;
} }
if (ammeterInfo.State.Equals(2)) if (ammeterInfo.State.Equals(2))
{ {
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return; return;
} }
@ -661,22 +649,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return; return;
} }
if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
return; return;
} }
if (Convert.ToInt32(ammeterInfo.Address) > 65535) if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
return; return;
} }
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
return; return;
} }
@ -703,7 +691,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (tempSubCodes == null || tempSubCodes.Count <= 0) if (tempSubCodes == null || tempSubCodes.Count <= 0)
{ {
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); //_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return; return;
} }
else else
@ -753,7 +741,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
else else
{ {
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
continue; continue;
} }
} }
@ -761,7 +749,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (dataInfos == null || dataInfos.Length <= 0) if (dataInfos == null || dataInfos.Length <= 0)
{ {
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue; continue;
} }
@ -802,6 +790,78 @@ namespace JiShe.CollectBus.ScheduledMeterReading
pipe.HSet(redisCacheKey, keyValuePairs); pipe.HSet(redisCacheKey, keyValuePairs);
object[] ret = pipe.EndPipe(); object[] ret = pipe.EndPipe();
} }
await Task.CompletedTask;
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessage(string topicName,
MeterReadingRecords taskRecord)
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
}
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
TopicPartition topicPartition = new TopicPartition(topicName, partition);
await _producerService.ProduceAsync(topicPartition, null, taskRecord);
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
{
var currentDateTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType);
//FreeRedisProvider.Instance.key()
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
} }
/// <summary> /// <summary>

View File

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Confluent.Kafka;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
@ -13,6 +14,7 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using MassTransit; using MassTransit;
@ -34,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
string serverTagName = string.Empty; string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration) : base(logger, producerBus, meterReadingRecordRepository, dbProvider) ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService<Null, MeterReadingRecords> producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
{ {
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!; serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
} }

View File

@ -40,8 +40,18 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
/// <summary> /// <summary>
/// 初始化或增量更新缓存 /// 初始化或增量更新缓存
/// </summary> /// </summary>
public static void InitializeCache(List<string> deviceList, int groupCount = 60) public static void InitializeCache(List<string> deviceList, int groupCount = 30)
{ {
if (deviceList == null || deviceList.Count <= 0)
{
throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化失败,设备数据为空");
}
if (groupCount > 60 || groupCount <= 0)
{
groupCount = 60;
}
lock (_syncRoot) lock (_syncRoot)
{ {
// 首次初始化 // 首次初始化
@ -55,7 +65,9 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
else else
{ {
if (_currentCache.CachedGroups.Length != groupCount) if (_currentCache.CachedGroups.Length != groupCount)
throw new ArgumentException("Group count cannot change after initial initialization"); {
throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化完成以后,分组数量不能更改");
}
var clonedCache = CloneExistingCache(); var clonedCache = CloneExistingCache();
UpdateCacheWithDevices(clonedCache, deviceList, groupCount); UpdateCacheWithDevices(clonedCache, deviceList, groupCount);
@ -165,49 +177,90 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
}); });
} }
/// <summary> /// <summary>
/// 并行处理所有分组设备(每个分组一个处理线程 /// 智能节流处理CPU友好型
/// </summary> /// </summary>
//public static void ProcessAllGroups<T>(Action<List<T>> processAction) where T : DeviceGroupBasicModel /// <typeparam name="T">已经分组的设备信息</typeparam>
//{ /// <param name="items">部分或者全部的已经分组的设备集合</param>
// var cache = _currentCache; /// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
// if (cache == null) /// <param name="processor">处理委托参数当前对象线程ID</param>
// throw new InvalidOperationException("缓存未初始化"); /// <param name="maxConcurrency">可选最佳并发度</param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessWithThrottleAsync<T>(
List<T> items,
Func<T, string> deviceIdSelector,
Action<T> processor,
int? maxConcurrency = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
// // 使用并行选项控制并发度 // 自动计算最佳并发度
// var options = new ParallelOptions int recommendedThreads = CalculateOptimalThreadCount();
// { if ((maxConcurrency.HasValue && maxConcurrency.Value > cache.CachedGroups.Length) || maxConcurrency.HasValue == false)
// MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量 {
// }; maxConcurrency = cache.CachedGroups.Length;
}
// Parallel.For(0, cache.CachedGroups.Length, options, groupId => int actualThreads = maxConcurrency ?? recommendedThreads;
// {
// // 获取当前分组的只读副本
// var groupDevices = GetGroupSnapshot(cache, groupId);
// processAction(groupDevices); // 创建节流器
using var throttler = new SemaphoreSlim(initialCount: actualThreads);
// //foreach (var deviceId in groupDevices) // 使用LongRunning避免线程池饥饿
// //{ var tasks = items.Select(async item =>
// // //执行处理操作 {
// // processAction(deviceId); await throttler.WaitAsync();
try
{
var deviceId = deviceIdSelector(item);
if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId))
{
// 分组级处理(保持顺序性)
await ProcessItemAsync(item, processor, groupId);
}
}
finally
{
throttler.Release();
}
});
// // // 可添加取消检测 await Task.WhenAll(tasks);
// // // if (token.IsCancellationRequested) break; }
// //}
// }); /// <summary>
//} /// 自动计算最优线程数
/// </summary>
private static int CalculateOptimalThreadCount()
{
int coreCount = Environment.ProcessorCount;
return Math.Min(
coreCount * 2, // 超线程优化
_currentCache?.CachedGroups.Length ?? 60
);
}
/// <summary>
/// 分组异步处理(带节流)
/// </summary>
private static async Task ProcessItemAsync<T>(T item, Action<T> processor, int groupId)
{
// 使用内存缓存降低CPU负载
await Task.Yield(); // 立即释放当前线程
// 分组处理上下文
var context = ExecutionContext.Capture();
ThreadPool.QueueUserWorkItem(_ =>
{
ExecutionContext.Run(context!, state =>
{
processor(item);
}, null);
});
}
///// <summary>
///// 获取分组数据快照(线程安全)
///// </summary>
//public static IReadOnlyList<string> GetGroupSnapshot(CacheState cache, int groupId)
//{
// lock (cache.CachedGroups[groupId])
// {
// return cache.CachedGroups[groupId].ToList(); // 创建内存快照
// }
//}
/// <summary> /// <summary>
/// 通过 deviceId 获取所在的分组集合 /// 通过 deviceId 获取所在的分组集合
@ -321,6 +374,32 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
return (int)hash; return (int)hash;
} }
/// <summary>
/// CRC16算法实现
/// </summary>
/// <param name="bytes"></param>
/// <returns></returns>
public static ushort CRC16Hash(byte[] bytes)
{
ushort crc = 0xFFFF;
for (int i = 0; i < bytes.Length; i++)
{
crc ^= bytes[i];
for (int j = 0; j < 8; j++)
{
if ((crc & 0x0001) == 1)
{
crc = (ushort)((crc >> 1) ^ 0xA001);
}
else
{
crc >>= 1;
}
}
}
return crc;
}
/// <summary> /// <summary>
/// 打印分组统计数据 /// 打印分组统计数据
/// </summary> /// </summary>

View File

@ -1,19 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
/// <summary>
/// 设备组基本模型
/// </summary>
public class DeviceGroupBasicModel
{
/// <summary>
/// 设备Id
/// </summary>
public string DeviceId { get; set; }
}
}

View File

@ -181,5 +181,24 @@ namespace JiShe.CollectBus.Common.Extensions
return $"{dateTime:yyyyMMddHH}"; return $"{dateTime:yyyyMMddHH}";
#endif #endif
} }
/// <summary>
/// 获取当前时间毫秒级时间戳
/// </summary>
/// <returns></returns>
public static long GetCurrentTimeMillis()
{
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
/// <summary>
/// 将Unix时间戳转换为日期时间
/// </summary>
/// <param name="millis"></param>
/// <returns></returns>
public static DateTime FromUnixMillis(long millis)
{
return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime;
}
} }
} }

View File

@ -64,5 +64,30 @@ namespace JiShe.CollectBus.Common.Extensions
? source.Where(predicate) ? source.Where(predicate)
: source; : source;
} }
/// <summary>
/// 分批
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source"></param>
/// <param name="batchSize"></param>
/// <returns></returns>
public static IEnumerable<IEnumerable<T>> Batch<T>(
this IEnumerable<T> source,
int batchSize)
{
var buffer = new List<T>(batchSize);
foreach (var item in source)
{
buffer.Add(item);
if (buffer.Count == batchSize)
{
yield return buffer;
buffer = new List<T>(batchSize);
}
}
if (buffer.Count > 0)
yield return buffer;
}
} }
} }

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.PagedResult
{
public class GlobalPagedResult<T>
{
/// <summary>
/// 数据集合
/// </summary>
public List<T> Items { get; set; }
/// <summary>
/// 是否有下一页
/// </summary>
public bool HasNext { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public long? NextScore { get; set; }
/// <summary>
/// 下一页的分页索引
/// </summary>
public string NextMember { get; set; }
}
}

View File

@ -4,18 +4,28 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider namespace JiShe.CollectBus.PagedResult
{ {
/// <summary> /// <summary>
/// 查询结果 /// 查询结果
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
public class PagedResult<T> public class BusPagedResult<T>
{ {
/// <summary> /// <summary>
/// 总条数 /// 总条数
/// </summary> /// </summary>
public int TotalCount { get; set; } public long TotalCount { get; set; }
/// <summary>
/// 当前页码
/// </summary>
public int PageIndex { get; set; }
/// <summary>
/// 每页条数
/// </summary>
public int PageSize { get; set; }
/// <summary> /// <summary>
/// 数据集合 /// 数据集合

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.FreeRedisProvider
{
/// <summary>
/// 设备缓存基础模型
/// </summary>
public class DeviceCacheBasicModel
{
/// <summary>
/// 集中器Id
/// </summary>
public int FocusId { get; set; }
/// <summary>
/// 表Id
/// </summary>
public int MeterId { get; set; }
}
}

View File

@ -1,6 +1,7 @@
using FreeRedis; using FreeRedis;
using JetBrains.Annotations; using JetBrains.Annotations;
using JiShe.CollectBus.FreeRedisProvider.Options; using JiShe.CollectBus.FreeRedisProvider.Options;
using JiShe.CollectBus.PagedResult;
using JiShe.CollectBus.Serializer; using JiShe.CollectBus.Serializer;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System; using System;
@ -47,5 +48,443 @@ namespace JiShe.CollectBus.FreeRedisProvider
Instance.Notice += (s, e) => Trace.WriteLine(e.Log); Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance; return Instance;
} }
//public async Task AddMeterZSetCacheData<T>(string redisCacheKey, string redisCacheIndexKey, decimal score, T data)
//{
// if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey))
// {
// throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101");
// }
// // 生成唯一member标识
// var member = data.Serialize();
// // 计算score范围
// decimal dataScore = (long)score << 32;
// //// 事务操作
// //using (var tran = FreeRedisProvider.Instance.Multi())
// //{
// // await tran.ZAddAsync(cacheKey, score,member);
// // await tran.SAddAsync($"cat_index:{categoryId}", member);
// // object[] ret = tran.Exec();
// //}
// using (var pipe = Instance.StartPipe())
// {
// pipe.ZAdd(redisCacheKey, dataScore, member);
// pipe.SAdd(redisCacheIndexKey, member);
// object[] ret = pipe.EndPipe();
// }
// await Task.CompletedTask;
//}
//public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>(
//string redisCacheKey,
//string redisCacheIndexKey,
//decimal score,
//int pageSize = 10,
//int pageIndex = 1)
//{
// if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey))
// {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101");
// }
// // 计算score范围
// decimal minScore = (long)score << 32;
// decimal maxScore = ((long)score + 1) << 32;
// // 分页参数
// int start = (pageIndex - 1) * pageSize;
// // 查询主数据
// var members = await Instance.ZRevRangeByScoreAsync(
// redisCacheKey,
// maxScore,
// minScore,
// offset: start,
// count: pageSize
// );
// if (members == null)
// {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102");
// }
// // 查询总数
// var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore);
// return new BusPagedResult<T>
// {
// Items = members.Select(m =>
// BusJsonSerializer.Deserialize<T>(m)!).ToList(),
// TotalCount = total,
// PageIndex = pageIndex,
// PageSize = pageSize
// };
//}
///// <summary>
///// 删除数据示例
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">分类</param>
///// <param name="redisCacheIndexKey"></param>
///// <param name="data"></param>
///// <returns></returns>
//public async Task RemoveMeterZSetData<T>(
//string redisCacheKey,
//string redisCacheIndexKey,
//T data)
//{
// // 查询需要删除的member
// var members = await Instance.SMembersAsync(redisCacheIndexKey);
// var target = members.FirstOrDefault(m =>
// BusJsonSerializer.Deserialize<T>(m) == data);//泛型此处该如何处理?
// if (target != null)
// {
// using (var trans = Instance.Multi())
// {
// trans.ZRem(redisCacheKey, target);
// trans.SRem(redisCacheIndexKey, target);
// trans.Exec();
// }
// }
// await Task.CompletedTask;
//}
public async Task AddMeterZSetCacheData<T>(
string redisCacheKey,
string redisCacheIndexKey,
int categoryId, // 新增分类ID参数
T data,
DateTimeOffset? timestamp = null)
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
|| string.IsNullOrWhiteSpace(redisCacheIndexKey))
{
throw new ArgumentException("Invalid parameters");
}
// 生成唯一member标识带数据指纹
var member = $"{categoryId}:{Guid.NewGuid()}";
var serializedData = data.Serialize();
// 计算组合score分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks;
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// 主数据存储Hash
trans.HSet(redisCacheKey, member, serializedData);
// 排序索引使用ZSET
trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member);
// 分类索引
trans.SAdd(redisCacheIndexKey, member);
//全局索引
trans.ZAdd("global_data_all", globalScore, member);
var results = trans.Exec();
if (results == null || results.Length <= 0)
throw new Exception("Transaction failed");
}
await Task.CompletedTask;
}
public async Task BatchAddMeterData<T>(
string redisCacheKey,
string indexKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel
{
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
//foreach (var batch in items.Batch(BATCH_SIZE))
//{
// await semaphore.WaitAsync();
// _ = Task.Run(async () =>
// {
// using (var pipe = FreeRedisProvider.Instance.StartPipe())
// {
// foreach (var item in batch)
// {
// var member = $"{item.CategoryId}:{Guid.NewGuid()}";
// long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks;
// // Hash主数据
// pipe.HSet(redisCacheKey, member, item.Data.Serialize());
// // 分类索引
// pipe.ZAdd($"{redisCacheKey}_scores", score, member);
// // 全局索引
// pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member);
// // 分类快速索引
// pipe.SAdd(indexKey, member);
// }
// pipe.EndPipe();
// }
// semaphore.Release();
// });
//}
await Task.CompletedTask;
}
public async Task UpdateMeterData<T>(
string redisCacheKey,
string oldCategoryIndexKey,
string newCategoryIndexKey,
string memberId, // 唯一标识(格式:"分类ID:GUID"
T newData,
int? newCategoryId = null,
DateTimeOffset? newTimestamp = null)
{
// 参数校验
if (string.IsNullOrWhiteSpace(memberId))
throw new ArgumentException("Invalid member ID");
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local oldIndex = KEYS[3]
local newIndex = KEYS[4]
local member = ARGV[1]
local newData = ARGV[2]
local newScore = ARGV[3]
--
if redis.call('HEXISTS', mainKey, member) == 0 then
return 0
end
--
redis.call('HSET', mainKey, member, newData)
--
if newScore ~= '' then
--
redis.call('SREM', oldIndex, member)
--
redis.call('ZADD', scoreKey, newScore, member)
--
redis.call('SADD', newIndex, member)
end
return 1
";
// 计算新score当分类或时间变化时
long? newScoreValue = null;
if (newCategoryId.HasValue || newTimestamp.HasValue)
{
var parts = memberId.Split(':');
var oldCategoryId = int.Parse(parts[0]);
var actualCategoryId = newCategoryId ?? oldCategoryId;
var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks;
}
var result = await Instance.EvalAsync(luaScript,
new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
oldCategoryIndexKey,
newCategoryIndexKey
},
new[]
{
memberId,
newData.Serialize(),
newScoreValue?.ToString() ?? ""
});
// 如果时间戳变化则更新全局索引
if (newTimestamp.HasValue)
{
long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds();
await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId);
}
if ((int)result == 0)
throw new KeyNotFoundException("指定数据不存在");
}
public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>(
string redisCacheKey,
string redisCacheIndexKey,
int categoryId,
int pageSize = 10,
int pageIndex = 1,
bool descending = true)
{
// 计算score范围
long minScore = (long)categoryId << 32;
long maxScore = ((long)categoryId + 1) << 32;
// 分页参数计算
int start = (pageIndex - 1) * pageSize;
// 获取排序后的member列表
var members = descending
? await Instance.ZRevRangeByScoreAsync(
$"{redisCacheKey}_scores",
maxScore,
minScore,
start,
pageSize)
: await Instance.ZRangeByScoreAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore,
start,
pageSize);
// 批量获取实际数据
var dataTasks = members.Select(m =>
Instance.HGetAsync<T>(redisCacheKey, m)).ToArray();
await Task.WhenAll(dataTasks);
// 总数统计优化
var total = await Instance.ZCountAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore);
return new BusPagedResult<T>
{
Items = dataTasks.Select(t => t.Result).ToList(),
TotalCount = total,
PageIndex = pageIndex,
PageSize = pageSize
};
}
public async Task RemoveMeterZSetData<T>(
string redisCacheKey,
string redisCacheIndexKey,
string uniqueId) // 改为基于唯一标识删除
{
// 原子操作
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local indexKey = KEYS[3]
local member = ARGV[1]
redis.call('HDEL', mainKey, member)
redis.call('ZREM', scoreKey, member)
redis.call('SREM', indexKey, member)
return 1
";
var keys = new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
redisCacheIndexKey
};
var result = await Instance.EvalAsync(luaScript,
keys,
new[] { uniqueId });
if ((int)result != 1)
throw new Exception("删除操作失败");
}
public async Task<GlobalPagedResult<T>> GetGlobalPagedData<T>(
string redisCacheKey,
int pageSize = 10,
long? lastScore = null,
string lastMember = null,
bool descending = true)
{
const string zsetKey = "global_data_all";
// 分页参数处理
var (startScore, excludeMember) = descending
? (lastScore ?? long.MaxValue, lastMember)
: (lastScore ?? 0, lastMember);
// 获取成员列表
string[] members;
if (descending)
{
members = await Instance.ZRevRangeByScoreAsync(
zsetKey,
max: startScore,
min: 0,
offset: 0,
count: pageSize + 1);
}
else
{
members = await Instance.ZRangeByScoreAsync(
zsetKey,
min: startScore,
max: long.MaxValue,
offset: 0,
count: pageSize + 1);
}
// 处理分页结果
bool hasNext = members.Length > pageSize;
var actualMembers = members.Take(pageSize).ToArray();
// 批量获取数据(优化版本)
var dataTasks = actualMembers
.Select(m => Instance.HGetAsync<T>(redisCacheKey, m))
.ToArray();
await Task.WhenAll(dataTasks);
// 获取下一页游标
(long? nextScore, string nextMember) = actualMembers.Any()
? await GetNextCursor(zsetKey, actualMembers.Last(), descending)
: (null, null);
return new GlobalPagedResult<T>
{
Items = dataTasks.Select(t => t.Result).ToList(),
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember
};
}
private async Task<(long? score, string member)> GetNextCursor(
string zsetKey,
string lastMember,
bool descending)
{
var score = await Instance.ZScoreAsync(zsetKey, lastMember);
return (score.HasValue ? (long)score.Value : null, lastMember);
}
} }
} }

View File

@ -1,4 +1,5 @@
using System; using JiShe.CollectBus.PagedResult;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -48,6 +49,6 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new(); Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new();
} }
} }

View File

@ -1,22 +1,14 @@
using Apache.IoTDB; using Apache.IoTDB;
using Apache.IoTDB.DataStructure; using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IoTDBProvider.Context; using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface; using JiShe.CollectBus.IoTDBProvider.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Text; using System.Text;
using System.Threading.Tasks; using JiShe.CollectBus.PagedResult;
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.IoTDBProvider namespace JiShe.CollectBus.IoTDBProvider
{ {
@ -118,12 +110,12 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="options"></param> /// <param name="options"></param>
/// <returns></returns> /// <returns></returns>
public async Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new() public async Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
{ {
var query = BuildQuerySQL<T>(options); var query = BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
var result = new PagedResult<T> var result = new BusPagedResult<T>
{ {
TotalCount = await GetTotalCount<T>(options), TotalCount = await GetTotalCount<T>(options),
Items = ParseResults<T>(sessionDataSet, options.PageSize) Items = ParseResults<T>(sessionDataSet, options.PageSize)

View File

@ -1,5 +1,6 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
@ -10,6 +11,10 @@ namespace JiShe.CollectBus.Kafka
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
// 注册Producer
context.Services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>));
// 注册Consumer
context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>));
} }
} }
} }

View File

@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly ILogger<ConsumerService<TKey, TValue>> _logger; private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger) public ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
{ {
_logger = logger; _logger = logger;
GetInstance(configuration); GetInstance(configuration);

View File

@ -10,6 +10,7 @@ namespace JiShe.CollectBus.Kafka.Producer
public interface IProducerService<TKey, TValue> public interface IProducerService<TKey, TValue>
{ {
Task ProduceAsync(string topic, TKey key, TValue value); Task ProduceAsync(string topic, TKey key, TValue value);
Task ProduceAsync(TopicPartition topicPartition, TKey key, TValue value);
Task ProduceAsync(string topic, TValue value); Task ProduceAsync(string topic, TValue value);
void Dispose(); void Dispose();
} }

View File

@ -16,7 +16,7 @@ namespace JiShe.CollectBus.Kafka.Producer
private readonly ILogger<ProducerService<TKey, TValue>> _logger; private readonly ILogger<ProducerService<TKey, TValue>> _logger;
protected ProducerService(IConfiguration configuration, ILogger<ProducerService<TKey, TValue>> logger) public ProducerService(IConfiguration configuration, ILogger<ProducerService<TKey, TValue>> logger)
{ {
_logger = logger; _logger = logger;
GetInstance(configuration); GetInstance(configuration);
@ -55,6 +55,17 @@ namespace JiShe.CollectBus.Kafka.Producer
await Instance.ProduceAsync(topic, message); await Instance.ProduceAsync(topic, message);
} }
public async Task ProduceAsync(TopicPartition topicPartition, TKey key, TValue value)
{
var message = new Message<TKey, TValue>
{
Key = key,
Value = value
};
await Instance.ProduceAsync(topicPartition, message);
}
public async Task ProduceAsync(string topic, TValue value) public async Task ProduceAsync(string topic, TValue value)
{ {
var message = new Message<TKey, TValue> var message = new Message<TKey, TValue>