From e564d8d7e675b081f95f8a7efdf5b0360dc336a9 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Fri, 14 Mar 2025 14:24:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E4=BC=98?= =?UTF-8?q?=E5=8C=96202503141424?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...he.CollectBus.Application.Contracts.csproj | 4 + .../IScheduledMeterReadingService.cs | 7 + .../IWorkerSubscriberAppService.cs | 0 .../Consumers/IssuedConsumer.cs | 4 +- .../Consumers/WorkerConsumer.cs | 40 ++ .../BasicScheduledMeterReadingService.cs | 672 ++++++++++++++++++ ...nergySystemScheduledMeterReadingService.cs | 21 + .../WorkerSubscriberAppService.cs | 0 .../BasicScheduledMeterReadingService.cs | 358 ---------- .../Enums/MeterLinkProtocolEnum.cs | 54 ++ .../Ammeters/AmmeterInfo.cs | 2 +- .../GatherItem/GatherItemInfo.cs | 21 + .../CollectBusHostModule.Configure.cs | 2 +- .../ProtocolConst.cs | 26 + 14 files changed, 849 insertions(+), 362 deletions(-) rename src/JiShe.CollectBus.Application.Contracts/{Workers => ScheduledMeterReading}/IScheduledMeterReadingService.cs (92%) rename src/JiShe.CollectBus.Application.Contracts/{Workers => Subscribers}/IWorkerSubscriberAppService.cs (100%) create mode 100644 src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs create mode 100644 src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs rename src/JiShe.CollectBus.Application/{Workers => ScheduledMeterReading}/EnergySystemScheduledMeterReadingService.cs (88%) rename src/JiShe.CollectBus.Application/{Workers => Subscribers}/WorkerSubscriberAppService.cs (100%) delete mode 100644 src/JiShe.CollectBus.Application/Workers/BasicScheduledMeterReadingService.cs create mode 100644 src/JiShe.CollectBus.Common/Enums/MeterLinkProtocolEnum.cs create mode 100644 src/JiShe.CollectBus.Domain/GatherItem/GatherItemInfo.cs diff --git a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj index 0a28849..0b1f26d 100644 --- a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj +++ b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj @@ -22,4 +22,8 @@ + + + + diff --git a/src/JiShe.CollectBus.Application.Contracts/Workers/IScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs similarity index 92% rename from src/JiShe.CollectBus.Application.Contracts/Workers/IScheduledMeterReadingService.cs rename to src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs index bd1a090..4f41051 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Workers/IScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.Watermeter; using System; using System.Collections.Generic; @@ -16,6 +17,12 @@ namespace JiShe.CollectBus.Workers public interface IScheduledMeterReadingService : IApplicationService { + /// + /// 获取采集项列表 + /// + /// + Task> GetGatherItemByDataTypes(); + #region 电表采集处理 /// /// 获取电表信息 diff --git a/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs similarity index 100% rename from src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerSubscriberAppService.cs rename to src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs diff --git a/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs index 710a268..52da062 100644 --- a/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs +++ b/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs @@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Consumers { public class IssuedConsumer: IConsumer { - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; @@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Consumers /// /// /// - public IssuedConsumer(ILogger logger, + public IssuedConsumer(ILogger logger, ITcpService tcpService, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository) diff --git a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs new file mode 100644 index 0000000..5a5652d --- /dev/null +++ b/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading.Tasks; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.MessageIssueds; +using JiShe.CollectBus.MessageReceiveds; +using MassTransit; +using Microsoft.Extensions.Logging; +using TouchSocket.Sockets; +using Volo.Abp.Domain.Repositories; + +namespace JiShe.CollectBus.Consumers +{ + /// + /// 定时抄读任务消费者 + /// + public class WorkerConsumer : IConsumer + { + private readonly ILogger _logger; + private readonly ITcpService _tcpService; + + /// + /// WorkerConsumer + /// + /// + /// + public WorkerConsumer(ILogger logger, + ITcpService tcpService) + { + _logger = logger; + _tcpService = tcpService; + } + + + public async Task Consume(ConsumeContext context) + { + await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); + } + } +} diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs new file mode 100644 index 0000000..c573ab5 --- /dev/null +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -0,0 +1,672 @@ +using DotNetCore.CAP; +using FreeRedis; +using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.MessageReceiveds; +using JiShe.CollectBus.Protocol.Contracts; +using JiShe.CollectBus.Watermeter; +using MassTransit; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Diagnostics.Metrics; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; +using TouchSocket.Sockets; +using Volo.Abp.Application.Services; +using static FreeSql.Internal.GlobalFilter; + +namespace JiShe.CollectBus.Workers +{ + /// + /// 定时采集服务 + /// + public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService + { + private readonly ILogger _logger; + private readonly ICapPublisher _capBus; + public BasicScheduledMeterReadingService(ILogger logger, ICapPublisher capBus) + { + _capBus = capBus; + _logger = logger; + } + + /// + /// 系统类型 + /// + public abstract string SystemType { get; } + + /// + ///电表日冻结采集项 + /// + protected List DayFreezeCodes = new List() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", }; + + /// + /// 电表月冻结采集项 + /// + protected List MonthFreezeCodes = new List() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", }; + + /// + /// 获取采集项列表 + /// + /// + public virtual Task> GetGatherItemByDataTypes() + { + throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现"); + } + + #region 电表采集处理 + + /// + /// 获取电表信息 + /// + /// 采集端Code + /// + public virtual Task> GetAmmeterInfoList(string gatherCode = "") + { + throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); + } + + /// + /// 初始化电表缓存数据 + /// + /// 采集端Code + /// + public virtual async Task InitAmmeterCacheData(string gatherCode = "") + { + var meterInfos = await GetAmmeterInfoList(gatherCode); + if (meterInfos == null || meterInfos.Count <= 0) + { + throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); + } + + //获取采集项类型数据 + var gatherItemInfos = await GetGatherItemByDataTypes(); + if (gatherItemInfos == null || gatherItemInfos.Count <= 0) + { + throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空"); + } + + //根据采集频率分组,获得采集频率分组 + var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) + { + //将表计信息根据集中器分组,获得集中器号 + var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); + foreach (var item in meterInfoGroup) + { + if (string.IsNullOrWhiteSpace(item.Key)) + { + continue; + } + + var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; + Dictionary keyValuePairs = new Dictionary(); + foreach (var ammeter in item) + { + //处理ItemCode + if (string.IsNullOrWhiteSpace(ammeter.ItemCodes)) + { + var itemArr = ammeter.DataTypes.Split(',').ToList(); + + #region 拼接采集项 + List itemCodeList = new List(); + foreach (var dataType in itemArr) + { + var excludeItemCode = "10_98,10_94";//排除透明转发:尖峰平谷时段、跳合闸 + var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType)); + if (gatherItem != null) + { + if (!excludeItemCode.Contains(gatherItem.ItemCode)) + { + itemCodeList.Add(gatherItem.ItemCode); + } + } + } + #endregion + + #region 特殊电表采集项编号处理 + if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS + { + itemCodeList.Add("10_95"); + } + //if (itemArr.Exists(e => e.Equals("109")))//WAVE_109 + // ammeter.ItemCodes += "10_109,"; + #endregion + + ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串 + + if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes)) + { + ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109"); + } + } + + keyValuePairs.TryAdd($"{ammeter.ID}", ammeter); + } + await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs); + } + } + + _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成"); + } + + /// + /// 1分钟采集电表数据 + /// + /// + public virtual async Task AmmeterScheduledMeterOneMinuteReading() + { + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*"; + var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); + if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-101"); + return; + } + + // 解析结果(结果为嵌套数组) + Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 1); + if (meterInfos == null || meterInfos.Count <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102"); + return; + } + + await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos); + + _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理完成"); + + } + + /// + /// 5分钟采集电表数据 + /// + /// + public virtual async Task AmmeterScheduledMeterFiveMinuteReading() + { + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 5)}*"; + var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); + if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-101"); + return; + } + + // 解析结果(结果为嵌套数组) + Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 5); + if (meterInfos == null || meterInfos.Count <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-102"); + return; + } + await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos); + + _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理完成"); + } + + /// + /// 15分钟采集电表数据 + /// + /// + public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() + { + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 15)}*"; + var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); + if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-101"); + return; + } + + // 解析结果(结果为嵌套数组) + Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 15); + if (meterInfos == null || meterInfos.Count <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-102"); + return; + } + + await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos); + _logger.LogInformation($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成"); + } + #endregion + + + #region 水表采集处理 + + /// + /// 获取水表信息 + /// + /// 采集端Code + /// + public virtual Task> GetWatermeterInfoList(string gatherCode = "") + { + throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现"); + } + + /// + /// 初始化水表缓存数据 + /// + /// 采集端Code + /// + public virtual async Task InitWatermeterCacheData(string gatherCode = "") + { + var meterInfos = await GetWatermeterInfoList(gatherCode); + if (meterInfos == null || meterInfos.Count <= 0) + { + throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空"); + } + + //获取采集项类型数据 + var gatherItemInfos = await GetGatherItemByDataTypes(); + if (gatherItemInfos == null || gatherItemInfos.Count <= 0) + { + throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空"); + } + + //根据采集频率分组,获得采集频率分组 + var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) + { + //将表计信息根据集中器分组,获得集中器号 + var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); + foreach (var item in meterInfoGroup) + { + if (string.IsNullOrWhiteSpace(item.Key)) + { + continue; + } + + var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; + Dictionary keyValuePairs = new Dictionary(); + foreach (var subItem in item) + { + + keyValuePairs.TryAdd($"{subItem.ID}", subItem); + } + await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs); + } + } + _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成"); + } + + /// + /// 1分钟采集水表数据 + /// + /// + public virtual async Task WatermeterScheduledMeterOneMinuteReading() + { + //获取缓存中的水表信息 + var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 1)}*"; + var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); + if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表据处理时没有获取到缓存信息,-101"); + return; + } + + // 解析结果(结果为嵌套数组) + Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 1); + if (meterInfos == null || meterInfos.Count <= 0) + { + _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理时没有获取到缓存信息,-102"); + return; + } + + _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理完成"); + } + + /// + /// 5分钟采集电表数据 + /// + /// + public virtual async Task WatermeterScheduledMeterFiveMinuteReading() + { + + //获取缓存中的水表信息 + var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 5)}*"; + var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); + if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表据处理时没有获取到缓存信息,-101"); + return; + } + + // 解析结果(结果为嵌套数组) + Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 5); + if (meterInfos == null || meterInfos.Count <= 0) + { + _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理时没有获取到缓存信息,-102"); + return; + } + + _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理完成"); + } + + /// + /// 15分钟采集电表数据 + /// + /// + public virtual async Task WatermeterScheduledMeterFifteenMinuteReading() + { + //获取缓存中的水表信息 + var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 15)}*"; + var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); + if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + { + _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表据处理时没有获取到缓存信息,-101"); + return; + } + + // 解析结果(结果为嵌套数组) + Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 15); + if (meterInfos == null || meterInfos.Count <= 0) + { + _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理时没有获取到缓存信息,-102"); + return; + } + + _logger.LogInformation($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理完成"); + } + #endregion + + + #region 公共处理方法 + /// + /// 批量获取缓存的表计信息 + /// + /// 表信息数据对象 + /// 采集频率对应的缓存Key集合 + /// 采集频率,1分钟、5分钟、15分钟 + /// + private async Task>> GetMeterCacheData(string[] redisKeys, int minute) + { + //通过lua脚本一次性获取所有缓存内容 + var luaScript = @" + local results = {} + for i, key in ipairs(KEYS) do + local data = redis.call('HGETALL', key) + results[i] = {key, data} + end + return results"; + var oneMinuteAmmerterResult = await FreeRedisProvider.FreeRedis.EvalAsync(luaScript, redisKeys); //传递 KEYS + if (oneMinuteAmmerterResult == null) + { + _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 定时任务采集表数据处理时没有获取到缓存信息,-102"); + return null; + } + + // 解析结果(结果为嵌套数组) + var meterInfos = new Dictionary>(); ; + if (oneMinuteAmmerterResult is object[] arr) + { + foreach (object[] item in arr) + { + string key = (string)item[0];//集中器地址对应的Redis缓存Key + object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 + var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, minute)}"; + string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 + + var meterHashs = new Dictionary(); + for (int i = 0; i < fieldsAndValues.Length; i += 2) + { + string meterld = (string)fieldsAndValues[i];//表ID + string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 + + T meterInfo = default; + if (!string.IsNullOrWhiteSpace(meterStr)) + { + meterInfo = meterStr.Deserialize()!; + } + if (meterInfo != null) + { + meterHashs[meterld] = meterInfo; + } + else + { + _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 定时任务采集表数据处理时集中器缓存{key}数据的{meterld}处理异常"); + } + } + meterInfos[focusAddress] = meterHashs; + } + } + + return meterInfos; + } + + /// + /// 电表采集任务指令创建 + /// + /// 采集频率订阅主题 + /// 集中器数据分组 + /// + private async Task AmmerterScheduledMeterReadingIssued(string eventName, Dictionary> focusGroup) + { + if (string.IsNullOrWhiteSpace(eventName) || focusGroup == null || focusGroup.Count <= 0) + { + _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101"); + return; + } + try + { + //将采集器编号的hash值取模分组 + const int TotalShards = 20; + var focusHashGroups = new Dictionary>>(); + + foreach (var (collectorId, ammetersDictionary) in focusGroup) + { + if (string.IsNullOrWhiteSpace(collectorId)) + { + _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败,无效Key -102"); + continue; + } + + // 计算哈希分组ID + int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards); + + // 获取或创建分组(避免重复查找) + if (!focusHashGroups.TryGetValue(hashGroupId, out var group)) + { + group = new Dictionary>(); + focusHashGroups[hashGroupId] = group; + } + + // 将当前集中器数据加入分组 + group[collectorId] = ammetersDictionary; + } + + if (focusHashGroups == null) + { + _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103"); + return; + } + + //根据分组创建线程批处理集中器 + foreach (var group in focusHashGroups) + { + _= Task.Run(async () => { await CreatePublishTask(eventName,group.Value); }); + } + + await Task.CompletedTask; + } + catch (Exception) + { + + throw; + } + } + + /// + /// 创建发布任务 + /// + /// + /// + /// + private async Task CreatePublishTask(string eventName, Dictionary> focusGroup) + { + foreach (var focusInfo in focusGroup) + { + foreach (var ammeterInfo in focusInfo.Value) + { + var meter = ammeterInfo.Value; + + if (string.IsNullOrWhiteSpace(meter.ItemCodes)) + { + _logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,采集项为空,-101"); + continue; + } + + //载波的不处理 + if (meter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) + { + _logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,载波不处理,-102"); + continue; + } + + if (meter.State.Equals(2)) + { + _logger.LogWarning($"{nameof(CreatePublishTask)} {meter.Name} 集中器{meter.FocusAddress}的电表{meter.Name}状态为禁用,不处理"); + continue; + } + + //排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 + if (!IsGennerateCmd(meter.LastTime, -1)) + { + _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name},采集时间:{meter.LastTime},已超过1天未在线,不生成指令"); + continue; + } + + if (string.IsNullOrWhiteSpace(meter.AreaCode)) + { + _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信区号为空"); + continue; + } + if (string.IsNullOrWhiteSpace(meter.Address)) + { + _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址为空"); + continue; + } + if (Convert.ToInt32(meter.Address) > 65535) + { + _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址无效,确保大于65535"); + continue; + } + if (meter.MeteringCode <= 0 || meter.MeteringCode > 2033) + { + _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},非有效测量点号({meter.MeteringCode})"); + continue; + } + + List tempCodes = meter.ItemCodes.Deserialize>()!; + + //TODO:自动上报数据只主动采集1类数据。 + if (meter.AutomaticReport.Equals(1)) + { + var tempSubCodes = new List(); + var tempItemCodes = string.Empty; + if (meter.ItemCodes.Contains("0C_49")) + tempItemCodes += "0C_49,"; + if (meter.ItemCodes.Contains("0C_149")) + tempItemCodes += "0C_149,"; + if (meter.ItemCodes.Contains("10_97")) + tempItemCodes += "10_97"; + + if (string.IsNullOrWhiteSpace(tempItemCodes)) + { + continue; + } + else + { + meter.ItemCodes = tempItemCodes; + } + } + + foreach (var tempItem in tempCodes) + { + //排除已发送日冻结和月冻结采集项配置 + if(DayFreezeCodes.Contains(tempItem)) + { + continue; + } + + if (MonthFreezeCodes.Contains(tempItem)) + { + continue; + } + + } + //排除已发送日冻结和月冻结采集项配置 + //if (!isSendDayFreeze) + meter.ItemCodes = meter.ItemCodes.Replace("0D_3", "").Replace("0D_4", "") + .Replace("0D_161", "").Replace("0D_162", "").Replace("0D_163", "").Replace("0D_164", "") + .Replace("0D_165", "").Replace("0D_166", "").Replace("0D_167", "").Replace("0D_168", "").Replace("0C_149", ""); + + //if (!isSendMonthFreeze) + meter.ItemCodes = meter.ItemCodes.Replace("0D_177", "").Replace("0D_178", "").Replace("0D_179", "").Replace("0D_180", "") + .Replace("0D_181", "").Replace("0D_181", "").Replace("0D_182", "").Replace("0D_183", "").Replace("0D_184", "") + .Replace("0D_193", "").Replace("0D_195", ""); + + + + //TODO:特殊表 + + + //var itemCodeArr = itemCode.Split('_'); + //var aFN = (AFN)itemCodeArr[0].HexToDec(); + //var fn = int.Parse(itemCodeArr[1]); + //if (aFN == AFN.请求实时数据) + //{ + // var bytes = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(address, ammeter.MeterCode.Value, (ATypeOfDataItems)fn); + // bytesList.Add(bytes); + //} + //else if (aFN == AFN.请求历史数据) + //{ + // var density = (FreezeDensity)input.Density; + // var bytes = Build3761SendData.BuildAmmeterReadingIIdataTypeItemsSendCmd(address, ammeter.MeterCode.Value, (IIdataTypeItems)fn, density, 0); + // bytesList.Add(bytes); + //} + + } + } + + string deviceNo = ""; + string messageHexString = ""; + + var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat + { + //ClientId = client.Id, + //ClientIp = client.IP, + //ClientPort = client.Port, + MessageHexString = messageHexString, + DeviceNo = deviceNo, + MessageId = NewId.NextGuid().ToString() + }; + await _capBus.PublishAsync(eventName, messageReceivedHeartbeatEvent); + } + + /// + /// 指定时间对比当前时间 + /// + /// + /// + /// + private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) + { + if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 + return false; + return true; + } + + #endregion + } +} diff --git a/src/JiShe.CollectBus.Application/Workers/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs similarity index 88% rename from src/JiShe.CollectBus.Application/Workers/EnergySystemScheduledMeterReadingService.cs rename to src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 07e0264..2d0fd19 100644 --- a/src/JiShe.CollectBus.Application/Workers/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,10 +1,12 @@ using DotNetCore.CAP; using FreeRedis; +using FreeSql; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Devices; using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.FreeSql; +using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.Watermeter; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; @@ -34,6 +36,25 @@ namespace JiShe.CollectBus.Workers public sealed override string SystemType => SystemTypeConst.Energy; + /// + /// 获取采集项列表 + /// + /// + public override async Task> GetGatherItemByDataTypes() + { + try + { + string sql = $"SELECT DataType,ItemCode FROM TB_GatherItem(NOLOCK) WHERE [State]=0"; + return await SqlProvider.Instance.Change(DbEnum.EnergyDB) + .Ado + .QueryAsync(sql, null); + } + catch + { + return null; + } + } + /// /// 获取电表信息 /// diff --git a/src/JiShe.CollectBus.Application/Workers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs similarity index 100% rename from src/JiShe.CollectBus.Application/Workers/WorkerSubscriberAppService.cs rename to src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs diff --git a/src/JiShe.CollectBus.Application/Workers/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/Workers/BasicScheduledMeterReadingService.cs deleted file mode 100644 index 680354f..0000000 --- a/src/JiShe.CollectBus.Application/Workers/BasicScheduledMeterReadingService.cs +++ /dev/null @@ -1,358 +0,0 @@ -using DotNetCore.CAP; -using FreeRedis; -using JiShe.CollectBus.Ammeters; -using JiShe.CollectBus.Common.Consts; -using JiShe.CollectBus.Common.Helpers; -using JiShe.CollectBus.Watermeter; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.Application.Services; -using static FreeSql.Internal.GlobalFilter; - -namespace JiShe.CollectBus.Workers -{ - /// - /// 定时采集服务 - /// - public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService - { - private readonly ILogger _logger; - private readonly ICapPublisher _capBus; - public BasicScheduledMeterReadingService(ILogger logger, ICapPublisher capBus) - { - _capBus = capBus; - _logger = logger; - } - - /// - /// 系统类型 - /// - public abstract string SystemType { get; } - - #region 电表采集处理 - - /// - /// 获取电表信息 - /// - /// 采集端Code - /// - public virtual Task> GetAmmeterInfoList(string gatherCode = "") - { - throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); - } - - /// - /// 初始化电表缓存数据 - /// - /// 采集端Code - /// - public virtual async Task InitAmmeterCacheData(string gatherCode = "") - { - var meterInfos = await GetAmmeterInfoList(gatherCode); - if (meterInfos == null || meterInfos.Count <= 0) - { - throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化电表缓存数据时,电表数据为空"); - } - - //根据采集频率分组 - var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) - { - //将表计信息根据集中器分组 - var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); - foreach (var item in meterInfoGroup) - { - if (string.IsNullOrWhiteSpace(item.Key)) - { - continue; - } - - var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; - Dictionary keyValuePairs = new Dictionary(); - foreach (var subItem in item) - { - - keyValuePairs.TryAdd($"{subItem.ID}", subItem); - } - await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs); - } - } - - _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成"); - } - - /// - /// 1分钟采集电表数据 - /// - /// - public virtual async Task AmmeterScheduledMeterOneMinuteReading() - { - //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*"; - var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } - - // 解析结果(结果为嵌套数组) - List meterInfos = await GetMeterCacheData(oneMinutekeyList); - if (meterInfos == null || meterInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } - - _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理完成"); - - } - - /// - /// 5分钟采集电表数据 - /// - /// - public virtual async Task AmmeterScheduledMeterFiveMinuteReading() - { - //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 5)}*"; - var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } - - // 解析结果(结果为嵌套数组) - List meterInfos = await GetMeterCacheData(oneMinutekeyList); - if (meterInfos == null || meterInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } - - _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理完成"); - } - - /// - /// 15分钟采集电表数据 - /// - /// - public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() - { - //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 15)}*"; - var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } - - // 解析结果(结果为嵌套数组) - List meterInfos = await GetMeterCacheData(oneMinutekeyList); - if (meterInfos == null || meterInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } - - _logger.LogInformation($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成"); - } - #endregion - - - #region 水表采集处理 - - /// - /// 获取水表信息 - /// - /// 采集端Code - /// - public virtual Task> GetWatermeterInfoList(string gatherCode = "") - { - throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现"); - } - - /// - /// 初始化水表缓存数据 - /// - /// 采集端Code - /// - public virtual async Task InitWatermeterCacheData(string gatherCode = "") - { - var meterInfos = await GetWatermeterInfoList(gatherCode); - if (meterInfos == null || meterInfos.Count <= 0) - { - throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空"); - } - //根据采集频率分组 - var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) - { - //将表计信息根据集中器分组 - var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); - foreach (var item in meterInfoGroup) - { - if (string.IsNullOrWhiteSpace(item.Key)) - { - continue; - } - - var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}"; - Dictionary keyValuePairs = new Dictionary(); - foreach (var subItem in item) - { - - keyValuePairs.TryAdd($"{subItem.ID}", subItem); - } - await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs); - } - } - _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成"); - } - - /// - /// 1分钟采集水表数据 - /// - /// - public virtual async Task WatermeterScheduledMeterOneMinuteReading() - { - //获取缓存中的水表信息 - var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 1)}*"; - var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表据处理时没有获取到缓存信息,-101"); - return; - } - - // 解析结果(结果为嵌套数组) - List meterInfos = await GetMeterCacheData(oneMinutekeyList); - if (meterInfos == null || meterInfos.Count <= 0) - { - _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } - - _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理完成"); - } - - /// - /// 5分钟采集电表数据 - /// - /// - public virtual async Task WatermeterScheduledMeterFiveMinuteReading() - { - - //获取缓存中的水表信息 - var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 5)}*"; - var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表据处理时没有获取到缓存信息,-101"); - return; - } - - // 解析结果(结果为嵌套数组) - List meterInfos = await GetMeterCacheData(oneMinutekeyList); - if (meterInfos == null || meterInfos.Count <= 0) - { - _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } - - _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理完成"); - } - - /// - /// 15分钟采集电表数据 - /// - /// - public virtual async Task WatermeterScheduledMeterFifteenMinuteReading() - { - //获取缓存中的水表信息 - var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 15)}*"; - var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表据处理时没有获取到缓存信息,-101"); - return; - } - - // 解析结果(结果为嵌套数组) - List meterInfos = await GetMeterCacheData(oneMinutekeyList); - if (meterInfos == null || meterInfos.Count <= 0) - { - _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } - - _logger.LogInformation($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理完成"); - } - #endregion - - - #region 公共处理方法 - /// - /// 批量获取缓存的表计信息 - /// - /// - /// - /// - private async Task> GetMeterCacheData(string[] redisKeys) - { - //通过lua脚本一次性获取所有缓存内容 - var luaScript = @" - local results = {} - for i, key in ipairs(KEYS) do - local data = redis.call('HGETALL', key) - results[i] = {key, data} - end - return results"; - var oneMinuteAmmerterResult = await FreeRedisProvider.FreeRedis.EvalAsync(luaScript, redisKeys); //传递 KEYS - if (oneMinuteAmmerterResult == null) - { - _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102"); - return null; - } - - // 解析结果(结果为嵌套数组) - List meterInfos = new List(); - if (oneMinuteAmmerterResult is object[] arr) - { - foreach (object[] item in arr) - { - string key = (string)item[0]; - object[] fieldsAndValues = (object[])item[1]; - - for (int i = 0; i < fieldsAndValues.Length; i += 2) - { - string field = (string)fieldsAndValues[i]; - string valueStr = (string)fieldsAndValues[i + 1]; - T value = default; - if (!string.IsNullOrWhiteSpace(valueStr)) - { - value = valueStr.Deserialize()!; - } - if (value != null) - { - meterInfos.Add(value); - } - else - { - _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表{key}数据{field}处理异常"); - } - } - } - } - - return meterInfos; - } - #endregion - } -} diff --git a/src/JiShe.CollectBus.Common/Enums/MeterLinkProtocolEnum.cs b/src/JiShe.CollectBus.Common/Enums/MeterLinkProtocolEnum.cs new file mode 100644 index 0000000..637acf9 --- /dev/null +++ b/src/JiShe.CollectBus.Common/Enums/MeterLinkProtocolEnum.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Enums +{ + /// + /// 表计连接通讯协议--表计与集中器的通讯协议 + /// + public enum MeterLinkProtocolEnum + { + /// + /// 无 + /// + None = 0, + + /// + /// DL/T 645—1997 + /// + DLT_645_1997 = 1, + + /// + /// 交流采样装置通信协议(电表) + /// + ACSamplingDevice = 2, + + /// + /// DL/T 645—2007 + /// + DLT_645_2007 = 30, + + /// + /// 载波通信 + /// + Carrierwave = 31, + + /// + /// CJ/T 188—2018协议(水表) + /// + CJT_188_2018 = 32, + + /// + /// CJ/T 188—2004协议 + /// + CJT_188_2004 = 33, + + /// + /// MODBUS-RTU + /// + MODBUS_RTU = 34, + } +} diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index 7dce8ac..f62ba92 100644 --- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -87,7 +87,7 @@ namespace JiShe.CollectBus.Ammeters public int TimeDensity { get; set; } /// - /// 该电表方案下采集项,如:0D_80 + /// 该电表方案下采集项,JSON格式,如:["0D_80","0D_80"] /// public string ItemCodes { get; set; } diff --git a/src/JiShe.CollectBus.Domain/GatherItem/GatherItemInfo.cs b/src/JiShe.CollectBus.Domain/GatherItem/GatherItemInfo.cs new file mode 100644 index 0000000..da16012 --- /dev/null +++ b/src/JiShe.CollectBus.Domain/GatherItem/GatherItemInfo.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.GatherItem +{ + public class GatherItemInfo + { + /// + /// 数据类型 + /// + public string DataType { get; set; } + + /// + /// 采集项编码 + /// + public string ItemCode { get; set; } + } +} diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 3a0390a..56d3fca 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -224,7 +224,7 @@ namespace JiShe.CollectBus.Host { config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "10500")) //.SetTcpDataHandlingAdapter(()=>new StandardFixedHeaderDataHandlingAdapter()) - //.SetGetDefaultNewId(() => Guid.NewGuid().ToString())//定义ClinetId的生成策略 + //.SetGetDefaultNewId(() => Guid.NewGuid().ToString())//定义ClientId的生成策略 .ConfigurePlugins(a => { a.Add(); diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs index ee2cddf..844a50b 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs @@ -14,6 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts public const string SubscriberReceivedHeartbeatEventName = "received.heartbeat.event"; public const string SubscriberReceivedLoginEventName = "received.login.event"; + #region 电表消息主题 /// /// 1分钟采集电表数据下行消息主题 /// @@ -27,6 +28,18 @@ namespace JiShe.CollectBus.Protocol.Contracts /// public const string AmmeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.ammeter.event"; + /// + /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、定时阀控等 + /// + public const string AmmeterSubscriberWorkerOtherIssuedEventName = "issued.other.ammeter.event"; + + /// + /// 电表手动阀控 + /// + public const string AmmeterSubscriberWorkerManualValveControlIssuedEventName = "issued.control.ammeter.event"; + #endregion + + #region 水表消息主题 /// /// 1分钟采集水表数据下行消息主题 /// @@ -40,5 +53,18 @@ namespace JiShe.CollectBus.Protocol.Contracts /// public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.watermeter.event"; + /// + /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、定时阀控等 + /// + public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.other.watermeter.event"; + + /// + /// 水表手动阀控 + /// + public const string WatermeterSubscriberWorkerManualValveControlIssuedEventName = "issued.control.watermeter.event"; + #endregion + + + } }