using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Workers; using MassTransit; using Microsoft.Extensions.Logging; namespace JiShe.CollectBus.ScheduledMeterReading { /// /// 定时采集服务 /// public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService { private readonly ILogger _logger; private readonly ICapPublisher _capBus; public BasicScheduledMeterReadingService(ILogger logger, ICapPublisher capBus) { _capBus = capBus; _logger = logger; } /// /// 系统类型 /// public abstract string SystemType { get; } /// ///电表日冻结采集项 /// protected List DayFreezeCodes = new List() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", }; /// /// 电表月冻结采集项 /// protected List MonthFreezeCodes = new List() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", }; /// /// 获取采集项列表 /// /// public virtual Task> GetGatherItemByDataTypes() { throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现"); } #region 电表采集处理 /// /// 获取电表信息 /// /// 采集端Code /// public virtual Task> GetAmmeterInfoList(string gatherCode = "") { throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); } /// /// 初始化电表缓存数据 /// /// 采集端Code /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { var meterInfos = await GetAmmeterInfoList(gatherCode); if (meterInfos == null || meterInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); } //获取采集项类型数据 var gatherItemInfos = await GetGatherItemByDataTypes(); if (gatherItemInfos == null || gatherItemInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空"); } //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { //将表计信息根据集中器分组,获得集中器号 var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); foreach (var item in meterInfoGroup) { if (string.IsNullOrWhiteSpace(item.Key)) { continue; } var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; Dictionary keyValuePairs = new Dictionary(); foreach (var ammeter in item) { //处理ItemCode if (string.IsNullOrWhiteSpace(ammeter.ItemCodes)) { var itemArr = ammeter.DataTypes.Split(',').ToList(); #region 拼接采集项 List itemCodeList = new List(); foreach (var dataType in itemArr) { var excludeItemCode = "10_98,10_94";//排除透明转发:尖峰平谷时段、跳合闸 var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType)); if (gatherItem != null) { if (!excludeItemCode.Contains(gatherItem.ItemCode)) { itemCodeList.Add(gatherItem.ItemCode); } } } #endregion #region 特殊电表采集项编号处理 if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS { itemCodeList.Add("10_95"); } //if (itemArr.Exists(e => e.Equals("109")))//WAVE_109 // ammeter.ItemCodes += "10_109,"; #endregion ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串 if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes)) { ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109"); } } keyValuePairs.TryAdd($"{ammeter.ID}", ammeter); } await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs); } } _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成"); } /// /// 1分钟采集电表数据 /// /// public virtual async Task AmmeterScheduledMeterOneMinuteReading() { //获取缓存中的电表信息 var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*"; var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-101"); return; } // 解析结果(结果为嵌套数组) Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 1); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos); _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理完成"); } /// /// 5分钟采集电表数据 /// /// public virtual async Task AmmeterScheduledMeterFiveMinuteReading() { //获取缓存中的电表信息 var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 5)}*"; var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-101"); return; } // 解析结果(结果为嵌套数组) Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 5); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos); _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理完成"); } /// /// 15分钟采集电表数据 /// /// public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() { //获取缓存中的电表信息 var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 15)}*"; var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-101"); return; } // 解析结果(结果为嵌套数组) Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 15); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos); _logger.LogInformation($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成"); } #endregion #region 水表采集处理 /// /// 获取水表信息 /// /// 采集端Code /// public virtual Task> GetWatermeterInfoList(string gatherCode = "") { throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现"); } /// /// 初始化水表缓存数据 /// /// 采集端Code /// public virtual async Task InitWatermeterCacheData(string gatherCode = "") { var meterInfos = await GetWatermeterInfoList(gatherCode); if (meterInfos == null || meterInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空"); } //获取采集项类型数据 var gatherItemInfos = await GetGatherItemByDataTypes(); if (gatherItemInfos == null || gatherItemInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空"); } //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { //将表计信息根据集中器分组,获得集中器号 var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); foreach (var item in meterInfoGroup) { if (string.IsNullOrWhiteSpace(item.Key)) { continue; } var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; Dictionary keyValuePairs = new Dictionary(); foreach (var subItem in item) { keyValuePairs.TryAdd($"{subItem.ID}", subItem); } await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs); } } _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成"); } /// /// 1分钟采集水表数据 /// /// public virtual async Task WatermeterScheduledMeterOneMinuteReading() { //获取缓存中的水表信息 var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 1)}*"; var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表据处理时没有获取到缓存信息,-101"); return; } // 解析结果(结果为嵌套数组) Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 1); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理完成"); } /// /// 5分钟采集电表数据 /// /// public virtual async Task WatermeterScheduledMeterFiveMinuteReading() { //获取缓存中的水表信息 var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 5)}*"; var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表据处理时没有获取到缓存信息,-101"); return; } // 解析结果(结果为嵌套数组) Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 5); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理完成"); } /// /// 15分钟采集电表数据 /// /// public virtual async Task WatermeterScheduledMeterFifteenMinuteReading() { //获取缓存中的水表信息 var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 15)}*"; var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表据处理时没有获取到缓存信息,-101"); return; } // 解析结果(结果为嵌套数组) Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 15); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } _logger.LogInformation($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理完成"); } #endregion #region 公共处理方法 /// /// 批量获取缓存的表计信息 /// /// 表信息数据对象 /// 采集频率对应的缓存Key集合 /// 采集频率,1分钟、5分钟、15分钟 /// private async Task>> GetMeterCacheData(string[] redisKeys, int minute) { //通过lua脚本一次性获取所有缓存内容 var luaScript = @" local results = {} for i, key in ipairs(KEYS) do local data = redis.call('HGETALL', key) results[i] = {key, data} end return results"; var oneMinuteAmmerterResult = await FreeRedisProvider.FreeRedis.EvalAsync(luaScript, redisKeys); //传递 KEYS if (oneMinuteAmmerterResult == null) { _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 定时任务采集表数据处理时没有获取到缓存信息,-102"); return null; } // 解析结果(结果为嵌套数组) var meterInfos = new Dictionary>(); ; if (oneMinuteAmmerterResult is object[] arr) { foreach (object[] item in arr) { string key = (string)item[0];//集中器地址对应的Redis缓存Key object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, minute)}"; string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 var meterHashs = new Dictionary(); for (int i = 0; i < fieldsAndValues.Length; i += 2) { string meterld = (string)fieldsAndValues[i];//表ID string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 T meterInfo = default; if (!string.IsNullOrWhiteSpace(meterStr)) { meterInfo = meterStr.Deserialize()!; } if (meterInfo != null) { meterHashs[meterld] = meterInfo; } else { _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 定时任务采集表数据处理时集中器缓存{key}数据的{meterld}处理异常"); } } meterInfos[focusAddress] = meterHashs; } } return meterInfos; } /// /// 电表采集任务指令创建 /// /// 采集频率订阅主题 /// 集中器数据分组 /// private async Task AmmerterScheduledMeterReadingIssued(string eventName, Dictionary> focusGroup) { if (string.IsNullOrWhiteSpace(eventName) || focusGroup == null || focusGroup.Count <= 0) { _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101"); return; } try { //将采集器编号的hash值取模分组 const int TotalShards = 20; var focusHashGroups = new Dictionary>>(); foreach (var (collectorId, ammetersDictionary) in focusGroup) { if (string.IsNullOrWhiteSpace(collectorId)) { _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败,无效Key -102"); continue; } // 计算哈希分组ID int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards); // 获取或创建分组(避免重复查找) if (!focusHashGroups.TryGetValue(hashGroupId, out var group)) { group = new Dictionary>(); focusHashGroups[hashGroupId] = group; } // 将当前集中器数据加入分组 group[collectorId] = ammetersDictionary; } if (focusHashGroups == null) { _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103"); return; } //根据分组创建线程批处理集中器 foreach (var group in focusHashGroups) { _= Task.Run(async () => { await CreatePublishTask(eventName,group.Value); }); } await Task.CompletedTask; } catch (Exception) { throw; } } /// /// 创建发布任务 /// /// /// /// private async Task CreatePublishTask(string eventName, Dictionary> focusGroup) { foreach (var focusInfo in focusGroup) { foreach (var ammeterInfo in focusInfo.Value) { var meter = ammeterInfo.Value; if (string.IsNullOrWhiteSpace(meter.ItemCodes)) { _logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,采集项为空,-101"); continue; } //载波的不处理 if (meter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) { _logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,载波不处理,-102"); continue; } if (meter.State.Equals(2)) { _logger.LogWarning($"{nameof(CreatePublishTask)} {meter.Name} 集中器{meter.FocusAddress}的电表{meter.Name}状态为禁用,不处理"); continue; } //排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 if (!IsGennerateCmd(meter.LastTime, -1)) { _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name},采集时间:{meter.LastTime},已超过1天未在线,不生成指令"); continue; } if (string.IsNullOrWhiteSpace(meter.AreaCode)) { _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信区号为空"); continue; } if (string.IsNullOrWhiteSpace(meter.Address)) { _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址为空"); continue; } if (Convert.ToInt32(meter.Address) > 65535) { _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址无效,确保大于65535"); continue; } if (meter.MeteringCode <= 0 || meter.MeteringCode > 2033) { _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},非有效测量点号({meter.MeteringCode})"); continue; } List tempCodes = meter.ItemCodes.Deserialize>()!; //TODO:自动上报数据只主动采集1类数据。 if (meter.AutomaticReport.Equals(1)) { var tempSubCodes = new List(); var tempItemCodes = string.Empty; if (meter.ItemCodes.Contains("0C_49")) tempItemCodes += "0C_49,"; if (meter.ItemCodes.Contains("0C_149")) tempItemCodes += "0C_149,"; if (meter.ItemCodes.Contains("10_97")) tempItemCodes += "10_97"; if (string.IsNullOrWhiteSpace(tempItemCodes)) { continue; } else { meter.ItemCodes = tempItemCodes; } } foreach (var tempItem in tempCodes) { //排除已发送日冻结和月冻结采集项配置 if(DayFreezeCodes.Contains(tempItem)) { continue; } if (MonthFreezeCodes.Contains(tempItem)) { continue; } } //排除已发送日冻结和月冻结采集项配置 //if (!isSendDayFreeze) meter.ItemCodes = meter.ItemCodes.Replace("0D_3", "").Replace("0D_4", "") .Replace("0D_161", "").Replace("0D_162", "").Replace("0D_163", "").Replace("0D_164", "") .Replace("0D_165", "").Replace("0D_166", "").Replace("0D_167", "").Replace("0D_168", "").Replace("0C_149", ""); //if (!isSendMonthFreeze) meter.ItemCodes = meter.ItemCodes.Replace("0D_177", "").Replace("0D_178", "").Replace("0D_179", "").Replace("0D_180", "") .Replace("0D_181", "").Replace("0D_181", "").Replace("0D_182", "").Replace("0D_183", "").Replace("0D_184", "") .Replace("0D_193", "").Replace("0D_195", ""); //TODO:特殊表 //var itemCodeArr = itemCode.Split('_'); //var aFN = (AFN)itemCodeArr[0].HexToDec(); //var fn = int.Parse(itemCodeArr[1]); //if (aFN == AFN.请求实时数据) //{ // var bytes = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(address, ammeter.MeterCode.Value, (ATypeOfDataItems)fn); // bytesList.Add(bytes); //} //else if (aFN == AFN.请求历史数据) //{ // var density = (FreezeDensity)input.Density; // var bytes = Build3761SendData.BuildAmmeterReadingIIdataTypeItemsSendCmd(address, ammeter.MeterCode.Value, (IIdataTypeItems)fn, density, 0); // bytesList.Add(bytes); //} } } string deviceNo = ""; string messageHexString = ""; var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat { //ClientId = client.Id, //ClientIp = client.IP, //ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; await _capBus.PublishAsync(eventName, messageReceivedHeartbeatEvent); } /// /// 指定时间对比当前时间 /// /// /// /// private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) { if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 return false; return true; } #endregion } }