diff --git a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
index 0a28849..0b1f26d 100644
--- a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
+++ b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
@@ -22,4 +22,8 @@
+
+
+
+
diff --git a/src/JiShe.CollectBus.Application.Contracts/Workers/IScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs
similarity index 92%
rename from src/JiShe.CollectBus.Application.Contracts/Workers/IScheduledMeterReadingService.cs
rename to src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs
index bd1a090..4f41051 100644
--- a/src/JiShe.CollectBus.Application.Contracts/Workers/IScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs
@@ -1,4 +1,5 @@
using JiShe.CollectBus.Ammeters;
+using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.Watermeter;
using System;
using System.Collections.Generic;
@@ -16,6 +17,12 @@ namespace JiShe.CollectBus.Workers
public interface IScheduledMeterReadingService : IApplicationService
{
+ ///
+ /// 获取采集项列表
+ ///
+ ///
+ Task> GetGatherItemByDataTypes();
+
#region 电表采集处理
///
/// 获取电表信息
diff --git a/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs
similarity index 100%
rename from src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerSubscriberAppService.cs
rename to src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs
diff --git a/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs
index 710a268..52da062 100644
--- a/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs
+++ b/src/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs
@@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Consumers
{
public class IssuedConsumer: IConsumer
{
- private readonly ILogger _logger;
+ private readonly ILogger _logger;
private readonly ITcpService _tcpService;
private readonly IRepository _messageReceivedLoginEventRepository;
private readonly IRepository _messageReceivedHeartbeatEventRepository;
@@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Consumers
///
///
///
- public IssuedConsumer(ILogger logger,
+ public IssuedConsumer(ILogger logger,
ITcpService tcpService,
IRepository messageReceivedLoginEventRepository,
IRepository messageReceivedHeartbeatEventRepository)
diff --git a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs
new file mode 100644
index 0000000..5a5652d
--- /dev/null
+++ b/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Threading.Tasks;
+using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.MessageIssueds;
+using JiShe.CollectBus.MessageReceiveds;
+using MassTransit;
+using Microsoft.Extensions.Logging;
+using TouchSocket.Sockets;
+using Volo.Abp.Domain.Repositories;
+
+namespace JiShe.CollectBus.Consumers
+{
+ ///
+ /// 定时抄读任务消费者
+ ///
+ public class WorkerConsumer : IConsumer
+ {
+ private readonly ILogger _logger;
+ private readonly ITcpService _tcpService;
+
+ ///
+ /// WorkerConsumer
+ ///
+ ///
+ ///
+ public WorkerConsumer(ILogger logger,
+ ITcpService tcpService)
+ {
+ _logger = logger;
+ _tcpService = tcpService;
+ }
+
+
+ public async Task Consume(ConsumeContext context)
+ {
+ await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
new file mode 100644
index 0000000..c573ab5
--- /dev/null
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -0,0 +1,672 @@
+using DotNetCore.CAP;
+using FreeRedis;
+using JiShe.CollectBus.Ammeters;
+using JiShe.CollectBus.Common.BuildSendDatas;
+using JiShe.CollectBus.Common.Consts;
+using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Helpers;
+using JiShe.CollectBus.GatherItem;
+using JiShe.CollectBus.MessageReceiveds;
+using JiShe.CollectBus.Protocol.Contracts;
+using JiShe.CollectBus.Watermeter;
+using MassTransit;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.Metrics;
+using System.Linq;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+using TouchSocket.Sockets;
+using Volo.Abp.Application.Services;
+using static FreeSql.Internal.GlobalFilter;
+
+namespace JiShe.CollectBus.Workers
+{
+ ///
+ /// 定时采集服务
+ ///
+ public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
+ {
+ private readonly ILogger _logger;
+ private readonly ICapPublisher _capBus;
+ public BasicScheduledMeterReadingService(ILogger logger, ICapPublisher capBus)
+ {
+ _capBus = capBus;
+ _logger = logger;
+ }
+
+ ///
+ /// 系统类型
+ ///
+ public abstract string SystemType { get; }
+
+ ///
+ ///电表日冻结采集项
+ ///
+ protected List DayFreezeCodes = new List() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", };
+
+ ///
+ /// 电表月冻结采集项
+ ///
+ protected List MonthFreezeCodes = new List() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", };
+
+ ///
+ /// 获取采集项列表
+ ///
+ ///
+ public virtual Task> GetGatherItemByDataTypes()
+ {
+ throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现");
+ }
+
+ #region 电表采集处理
+
+ ///
+ /// 获取电表信息
+ ///
+ /// 采集端Code
+ ///
+ public virtual Task> GetAmmeterInfoList(string gatherCode = "")
+ {
+ throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
+ }
+
+ ///
+ /// 初始化电表缓存数据
+ ///
+ /// 采集端Code
+ ///
+ public virtual async Task InitAmmeterCacheData(string gatherCode = "")
+ {
+ var meterInfos = await GetAmmeterInfoList(gatherCode);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
+ }
+
+ //获取采集项类型数据
+ var gatherItemInfos = await GetGatherItemByDataTypes();
+ if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
+ {
+ throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
+ }
+
+ //根据采集频率分组,获得采集频率分组
+ var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
+ foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
+ {
+ //将表计信息根据集中器分组,获得集中器号
+ var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
+ foreach (var item in meterInfoGroup)
+ {
+ if (string.IsNullOrWhiteSpace(item.Key))
+ {
+ continue;
+ }
+
+ var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
+ Dictionary keyValuePairs = new Dictionary();
+ foreach (var ammeter in item)
+ {
+ //处理ItemCode
+ if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
+ {
+ var itemArr = ammeter.DataTypes.Split(',').ToList();
+
+ #region 拼接采集项
+ List itemCodeList = new List();
+ foreach (var dataType in itemArr)
+ {
+ var excludeItemCode = "10_98,10_94";//排除透明转发:尖峰平谷时段、跳合闸
+ var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
+ if (gatherItem != null)
+ {
+ if (!excludeItemCode.Contains(gatherItem.ItemCode))
+ {
+ itemCodeList.Add(gatherItem.ItemCode);
+ }
+ }
+ }
+ #endregion
+
+ #region 特殊电表采集项编号处理
+ if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
+ {
+ itemCodeList.Add("10_95");
+ }
+ //if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
+ // ammeter.ItemCodes += "10_109,";
+ #endregion
+
+ ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
+
+ if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
+ {
+ ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
+ }
+ }
+
+ keyValuePairs.TryAdd($"{ammeter.ID}", ammeter);
+ }
+ await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
+ }
+ }
+
+ _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
+ }
+
+ ///
+ /// 1分钟采集电表数据
+ ///
+ ///
+ public virtual async Task AmmeterScheduledMeterOneMinuteReading()
+ {
+ //获取缓存中的电表信息
+ var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*";
+ var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
+ if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-101");
+ return;
+ }
+
+ // 解析结果(结果为嵌套数组)
+ Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 1);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102");
+ return;
+ }
+
+ await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos);
+
+ _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理完成");
+
+ }
+
+ ///
+ /// 5分钟采集电表数据
+ ///
+ ///
+ public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
+ {
+ //获取缓存中的电表信息
+ var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 5)}*";
+ var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
+ if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-101");
+ return;
+ }
+
+ // 解析结果(结果为嵌套数组)
+ Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 5);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-102");
+ return;
+ }
+ await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos);
+
+ _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理完成");
+ }
+
+ ///
+ /// 15分钟采集电表数据
+ ///
+ ///
+ public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
+ {
+ //获取缓存中的电表信息
+ var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 15)}*";
+ var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
+ if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-101");
+ return;
+ }
+
+ // 解析结果(结果为嵌套数组)
+ Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 15);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-102");
+ return;
+ }
+
+ await AmmerterScheduledMeterReadingIssued(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, meterInfos);
+ _logger.LogInformation($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成");
+ }
+ #endregion
+
+
+ #region 水表采集处理
+
+ ///
+ /// 获取水表信息
+ ///
+ /// 采集端Code
+ ///
+ public virtual Task> GetWatermeterInfoList(string gatherCode = "")
+ {
+ throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
+ }
+
+ ///
+ /// 初始化水表缓存数据
+ ///
+ /// 采集端Code
+ ///
+ public virtual async Task InitWatermeterCacheData(string gatherCode = "")
+ {
+ var meterInfos = await GetWatermeterInfoList(gatherCode);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
+ }
+
+ //获取采集项类型数据
+ var gatherItemInfos = await GetGatherItemByDataTypes();
+ if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
+ {
+ throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
+ }
+
+ //根据采集频率分组,获得采集频率分组
+ var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
+ foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
+ {
+ //将表计信息根据集中器分组,获得集中器号
+ var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
+ foreach (var item in meterInfoGroup)
+ {
+ if (string.IsNullOrWhiteSpace(item.Key))
+ {
+ continue;
+ }
+
+ var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
+ Dictionary keyValuePairs = new Dictionary();
+ foreach (var subItem in item)
+ {
+
+ keyValuePairs.TryAdd($"{subItem.ID}", subItem);
+ }
+ await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
+ }
+ }
+ _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
+ }
+
+ ///
+ /// 1分钟采集水表数据
+ ///
+ ///
+ public virtual async Task WatermeterScheduledMeterOneMinuteReading()
+ {
+ //获取缓存中的水表信息
+ var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 1)}*";
+ var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
+ if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ {
+ _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表据处理时没有获取到缓存信息,-101");
+ return;
+ }
+
+ // 解析结果(结果为嵌套数组)
+ Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 1);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理时没有获取到缓存信息,-102");
+ return;
+ }
+
+ _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理完成");
+ }
+
+ ///
+ /// 5分钟采集电表数据
+ ///
+ ///
+ public virtual async Task WatermeterScheduledMeterFiveMinuteReading()
+ {
+
+ //获取缓存中的水表信息
+ var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 5)}*";
+ var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
+ if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ {
+ _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表据处理时没有获取到缓存信息,-101");
+ return;
+ }
+
+ // 解析结果(结果为嵌套数组)
+ Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 5);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理时没有获取到缓存信息,-102");
+ return;
+ }
+
+ _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理完成");
+ }
+
+ ///
+ /// 15分钟采集电表数据
+ ///
+ ///
+ public virtual async Task WatermeterScheduledMeterFifteenMinuteReading()
+ {
+ //获取缓存中的水表信息
+ var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 15)}*";
+ var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
+ if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
+ {
+ _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表据处理时没有获取到缓存信息,-101");
+ return;
+ }
+
+ // 解析结果(结果为嵌套数组)
+ Dictionary> meterInfos = await GetMeterCacheData(oneMinutekeyList, 15);
+ if (meterInfos == null || meterInfos.Count <= 0)
+ {
+ _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理时没有获取到缓存信息,-102");
+ return;
+ }
+
+ _logger.LogInformation($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理完成");
+ }
+ #endregion
+
+
+ #region 公共处理方法
+ ///
+ /// 批量获取缓存的表计信息
+ ///
+ /// 表信息数据对象
+ /// 采集频率对应的缓存Key集合
+ /// 采集频率,1分钟、5分钟、15分钟
+ ///
+ private async Task>> GetMeterCacheData(string[] redisKeys, int minute)
+ {
+ //通过lua脚本一次性获取所有缓存内容
+ var luaScript = @"
+ local results = {}
+ for i, key in ipairs(KEYS) do
+ local data = redis.call('HGETALL', key)
+ results[i] = {key, data}
+ end
+ return results";
+ var oneMinuteAmmerterResult = await FreeRedisProvider.FreeRedis.EvalAsync(luaScript, redisKeys); //传递 KEYS
+ if (oneMinuteAmmerterResult == null)
+ {
+ _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 定时任务采集表数据处理时没有获取到缓存信息,-102");
+ return null;
+ }
+
+ // 解析结果(结果为嵌套数组)
+ var meterInfos = new Dictionary>(); ;
+ if (oneMinuteAmmerterResult is object[] arr)
+ {
+ foreach (object[] item in arr)
+ {
+ string key = (string)item[0];//集中器地址对应的Redis缓存Key
+ object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合
+ var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, minute)}";
+ string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
+
+ var meterHashs = new Dictionary();
+ for (int i = 0; i < fieldsAndValues.Length; i += 2)
+ {
+ string meterld = (string)fieldsAndValues[i];//表ID
+ string meterStr = (string)fieldsAndValues[i + 1];//表详情数据
+
+ T meterInfo = default;
+ if (!string.IsNullOrWhiteSpace(meterStr))
+ {
+ meterInfo = meterStr.Deserialize()!;
+ }
+ if (meterInfo != null)
+ {
+ meterHashs[meterld] = meterInfo;
+ }
+ else
+ {
+ _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 定时任务采集表数据处理时集中器缓存{key}数据的{meterld}处理异常");
+ }
+ }
+ meterInfos[focusAddress] = meterHashs;
+ }
+ }
+
+ return meterInfos;
+ }
+
+ ///
+ /// 电表采集任务指令创建
+ ///
+ /// 采集频率订阅主题
+ /// 集中器数据分组
+ ///
+ private async Task AmmerterScheduledMeterReadingIssued(string eventName, Dictionary> focusGroup)
+ {
+ if (string.IsNullOrWhiteSpace(eventName) || focusGroup == null || focusGroup.Count <= 0)
+ {
+ _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
+ return;
+ }
+ try
+ {
+ //将采集器编号的hash值取模分组
+ const int TotalShards = 20;
+ var focusHashGroups = new Dictionary>>();
+
+ foreach (var (collectorId, ammetersDictionary) in focusGroup)
+ {
+ if (string.IsNullOrWhiteSpace(collectorId))
+ {
+ _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败,无效Key -102");
+ continue;
+ }
+
+ // 计算哈希分组ID
+ int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards);
+
+ // 获取或创建分组(避免重复查找)
+ if (!focusHashGroups.TryGetValue(hashGroupId, out var group))
+ {
+ group = new Dictionary>();
+ focusHashGroups[hashGroupId] = group;
+ }
+
+ // 将当前集中器数据加入分组
+ group[collectorId] = ammetersDictionary;
+ }
+
+ if (focusHashGroups == null)
+ {
+ _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103");
+ return;
+ }
+
+ //根据分组创建线程批处理集中器
+ foreach (var group in focusHashGroups)
+ {
+ _= Task.Run(async () => { await CreatePublishTask(eventName,group.Value); });
+ }
+
+ await Task.CompletedTask;
+ }
+ catch (Exception)
+ {
+
+ throw;
+ }
+ }
+
+ ///
+ /// 创建发布任务
+ ///
+ ///
+ ///
+ ///
+ private async Task CreatePublishTask(string eventName, Dictionary> focusGroup)
+ {
+ foreach (var focusInfo in focusGroup)
+ {
+ foreach (var ammeterInfo in focusInfo.Value)
+ {
+ var meter = ammeterInfo.Value;
+
+ if (string.IsNullOrWhiteSpace(meter.ItemCodes))
+ {
+ _logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,采集项为空,-101");
+ continue;
+ }
+
+ //载波的不处理
+ if (meter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
+ {
+ _logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,载波不处理,-102");
+ continue;
+ }
+
+ if (meter.State.Equals(2))
+ {
+ _logger.LogWarning($"{nameof(CreatePublishTask)} {meter.Name} 集中器{meter.FocusAddress}的电表{meter.Name}状态为禁用,不处理");
+ continue;
+ }
+
+ //排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
+ if (!IsGennerateCmd(meter.LastTime, -1))
+ {
+ _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name},采集时间:{meter.LastTime},已超过1天未在线,不生成指令");
+ continue;
+ }
+
+ if (string.IsNullOrWhiteSpace(meter.AreaCode))
+ {
+ _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信区号为空");
+ continue;
+ }
+ if (string.IsNullOrWhiteSpace(meter.Address))
+ {
+ _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址为空");
+ continue;
+ }
+ if (Convert.ToInt32(meter.Address) > 65535)
+ {
+ _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址无效,确保大于65535");
+ continue;
+ }
+ if (meter.MeteringCode <= 0 || meter.MeteringCode > 2033)
+ {
+ _logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},非有效测量点号({meter.MeteringCode})");
+ continue;
+ }
+
+ List tempCodes = meter.ItemCodes.Deserialize>()!;
+
+ //TODO:自动上报数据只主动采集1类数据。
+ if (meter.AutomaticReport.Equals(1))
+ {
+ var tempSubCodes = new List();
+ var tempItemCodes = string.Empty;
+ if (meter.ItemCodes.Contains("0C_49"))
+ tempItemCodes += "0C_49,";
+ if (meter.ItemCodes.Contains("0C_149"))
+ tempItemCodes += "0C_149,";
+ if (meter.ItemCodes.Contains("10_97"))
+ tempItemCodes += "10_97";
+
+ if (string.IsNullOrWhiteSpace(tempItemCodes))
+ {
+ continue;
+ }
+ else
+ {
+ meter.ItemCodes = tempItemCodes;
+ }
+ }
+
+ foreach (var tempItem in tempCodes)
+ {
+ //排除已发送日冻结和月冻结采集项配置
+ if(DayFreezeCodes.Contains(tempItem))
+ {
+ continue;
+ }
+
+ if (MonthFreezeCodes.Contains(tempItem))
+ {
+ continue;
+ }
+
+ }
+ //排除已发送日冻结和月冻结采集项配置
+ //if (!isSendDayFreeze)
+ meter.ItemCodes = meter.ItemCodes.Replace("0D_3", "").Replace("0D_4", "")
+ .Replace("0D_161", "").Replace("0D_162", "").Replace("0D_163", "").Replace("0D_164", "")
+ .Replace("0D_165", "").Replace("0D_166", "").Replace("0D_167", "").Replace("0D_168", "").Replace("0C_149", "");
+
+ //if (!isSendMonthFreeze)
+ meter.ItemCodes = meter.ItemCodes.Replace("0D_177", "").Replace("0D_178", "").Replace("0D_179", "").Replace("0D_180", "")
+ .Replace("0D_181", "").Replace("0D_181", "").Replace("0D_182", "").Replace("0D_183", "").Replace("0D_184", "")
+ .Replace("0D_193", "").Replace("0D_195", "");
+
+
+
+ //TODO:特殊表
+
+
+ //var itemCodeArr = itemCode.Split('_');
+ //var aFN = (AFN)itemCodeArr[0].HexToDec();
+ //var fn = int.Parse(itemCodeArr[1]);
+ //if (aFN == AFN.请求实时数据)
+ //{
+ // var bytes = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(address, ammeter.MeterCode.Value, (ATypeOfDataItems)fn);
+ // bytesList.Add(bytes);
+ //}
+ //else if (aFN == AFN.请求历史数据)
+ //{
+ // var density = (FreezeDensity)input.Density;
+ // var bytes = Build3761SendData.BuildAmmeterReadingIIdataTypeItemsSendCmd(address, ammeter.MeterCode.Value, (IIdataTypeItems)fn, density, 0);
+ // bytesList.Add(bytes);
+ //}
+
+ }
+ }
+
+ string deviceNo = "";
+ string messageHexString = "";
+
+ var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
+ {
+ //ClientId = client.Id,
+ //ClientIp = client.IP,
+ //ClientPort = client.Port,
+ MessageHexString = messageHexString,
+ DeviceNo = deviceNo,
+ MessageId = NewId.NextGuid().ToString()
+ };
+ await _capBus.PublishAsync(eventName, messageReceivedHeartbeatEvent);
+ }
+
+ ///
+ /// 指定时间对比当前时间
+ ///
+ ///
+ ///
+ ///
+ private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
+ {
+ if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
+ return false;
+ return true;
+ }
+
+ #endregion
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/Workers/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
similarity index 88%
rename from src/JiShe.CollectBus.Application/Workers/EnergySystemScheduledMeterReadingService.cs
rename to src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 07e0264..2d0fd19 100644
--- a/src/JiShe.CollectBus.Application/Workers/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -1,10 +1,12 @@
using DotNetCore.CAP;
using FreeRedis;
+using FreeSql;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Devices;
using JiShe.CollectBus.FreeRedisProvider;
using JiShe.CollectBus.FreeSql;
+using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.Watermeter;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
@@ -34,6 +36,25 @@ namespace JiShe.CollectBus.Workers
public sealed override string SystemType => SystemTypeConst.Energy;
+ ///
+ /// 获取采集项列表
+ ///
+ ///
+ public override async Task> GetGatherItemByDataTypes()
+ {
+ try
+ {
+ string sql = $"SELECT DataType,ItemCode FROM TB_GatherItem(NOLOCK) WHERE [State]=0";
+ return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
+ .Ado
+ .QueryAsync(sql, null);
+ }
+ catch
+ {
+ return null;
+ }
+ }
+
///
/// 获取电表信息
///
diff --git a/src/JiShe.CollectBus.Application/Workers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
similarity index 100%
rename from src/JiShe.CollectBus.Application/Workers/WorkerSubscriberAppService.cs
rename to src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
diff --git a/src/JiShe.CollectBus.Application/Workers/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/Workers/BasicScheduledMeterReadingService.cs
deleted file mode 100644
index 680354f..0000000
--- a/src/JiShe.CollectBus.Application/Workers/BasicScheduledMeterReadingService.cs
+++ /dev/null
@@ -1,358 +0,0 @@
-using DotNetCore.CAP;
-using FreeRedis;
-using JiShe.CollectBus.Ammeters;
-using JiShe.CollectBus.Common.Consts;
-using JiShe.CollectBus.Common.Helpers;
-using JiShe.CollectBus.Watermeter;
-using Microsoft.Extensions.Logging;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Volo.Abp.Application.Services;
-using static FreeSql.Internal.GlobalFilter;
-
-namespace JiShe.CollectBus.Workers
-{
- ///
- /// 定时采集服务
- ///
- public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
- {
- private readonly ILogger _logger;
- private readonly ICapPublisher _capBus;
- public BasicScheduledMeterReadingService(ILogger logger, ICapPublisher capBus)
- {
- _capBus = capBus;
- _logger = logger;
- }
-
- ///
- /// 系统类型
- ///
- public abstract string SystemType { get; }
-
- #region 电表采集处理
-
- ///
- /// 获取电表信息
- ///
- /// 采集端Code
- ///
- public virtual Task> GetAmmeterInfoList(string gatherCode = "")
- {
- throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
- }
-
- ///
- /// 初始化电表缓存数据
- ///
- /// 采集端Code
- ///
- public virtual async Task InitAmmeterCacheData(string gatherCode = "")
- {
- var meterInfos = await GetAmmeterInfoList(gatherCode);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化电表缓存数据时,电表数据为空");
- }
-
- //根据采集频率分组
- var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
- foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
- {
- //将表计信息根据集中器分组
- var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
- foreach (var item in meterInfoGroup)
- {
- if (string.IsNullOrWhiteSpace(item.Key))
- {
- continue;
- }
-
- var redisCacheKey = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
- Dictionary keyValuePairs = new Dictionary();
- foreach (var subItem in item)
- {
-
- keyValuePairs.TryAdd($"{subItem.ID}", subItem);
- }
- await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
- }
- }
-
- _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
- }
-
- ///
- /// 1分钟采集电表数据
- ///
- ///
- public virtual async Task AmmeterScheduledMeterOneMinuteReading()
- {
- //获取缓存中的电表信息
- var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 1)}*";
- var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-101");
- return;
- }
-
- // 解析结果(结果为嵌套数组)
- List meterInfos = await GetMeterCacheData(oneMinutekeyList);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理完成");
-
- }
-
- ///
- /// 5分钟采集电表数据
- ///
- ///
- public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
- {
- //获取缓存中的电表信息
- var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 5)}*";
- var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-101");
- return;
- }
-
- // 解析结果(结果为嵌套数组)
- List meterInfos = await GetMeterCacheData(oneMinutekeyList);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} 5分钟采集电表数据处理完成");
- }
-
- ///
- /// 15分钟采集电表数据
- ///
- ///
- public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
- {
- //获取缓存中的电表信息
- var redisKeyList = $"{string.Format(FreeRedisConst.CacheAmmeterInfoKey, SystemTypeConst.Energy, 15)}*";
- var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-101");
- return;
- }
-
- // 解析结果(结果为嵌套数组)
- List meterInfos = await GetMeterCacheData(oneMinutekeyList);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- _logger.LogInformation($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成");
- }
- #endregion
-
-
- #region 水表采集处理
-
- ///
- /// 获取水表信息
- ///
- /// 采集端Code
- ///
- public virtual Task> GetWatermeterInfoList(string gatherCode = "")
- {
- throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
- }
-
- ///
- /// 初始化水表缓存数据
- ///
- /// 采集端Code
- ///
- public virtual async Task InitWatermeterCacheData(string gatherCode = "")
- {
- var meterInfos = await GetWatermeterInfoList(gatherCode);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
- }
- //根据采集频率分组
- var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
- foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
- {
- //将表计信息根据集中器分组
- var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
- foreach (var item in meterInfoGroup)
- {
- if (string.IsNullOrWhiteSpace(item.Key))
- {
- continue;
- }
-
- var redisCacheKey = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, itemTimeDensity.Key)}{item.Key}";
- Dictionary keyValuePairs = new Dictionary();
- foreach (var subItem in item)
- {
-
- keyValuePairs.TryAdd($"{subItem.ID}", subItem);
- }
- await FreeRedisProvider.FreeRedis.HSetAsync(redisCacheKey, keyValuePairs);
- }
- }
- _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
- }
-
- ///
- /// 1分钟采集水表数据
- ///
- ///
- public virtual async Task WatermeterScheduledMeterOneMinuteReading()
- {
- //获取缓存中的水表信息
- var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 1)}*";
- var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表据处理时没有获取到缓存信息,-101");
- return;
- }
-
- // 解析结果(结果为嵌套数组)
- List meterInfos = await GetMeterCacheData(oneMinutekeyList);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集水表数据处理完成");
- }
-
- ///
- /// 5分钟采集电表数据
- ///
- ///
- public virtual async Task WatermeterScheduledMeterFiveMinuteReading()
- {
-
- //获取缓存中的水表信息
- var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 5)}*";
- var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表据处理时没有获取到缓存信息,-101");
- return;
- }
-
- // 解析结果(结果为嵌套数组)
- List meterInfos = await GetMeterCacheData(oneMinutekeyList);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- _logger.LogError($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} 5分钟采集水表数据处理完成");
- }
-
- ///
- /// 15分钟采集电表数据
- ///
- ///
- public virtual async Task WatermeterScheduledMeterFifteenMinuteReading()
- {
- //获取缓存中的水表信息
- var redisKeyList = $"{string.Format(FreeRedisConst.CacheWatermeterInfoKey, SystemTypeConst.Energy, 15)}*";
- var oneMinutekeyList = await FreeRedisProvider.FreeRedis.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表据处理时没有获取到缓存信息,-101");
- return;
- }
-
- // 解析结果(结果为嵌套数组)
- List meterInfos = await GetMeterCacheData(oneMinutekeyList);
- if (meterInfos == null || meterInfos.Count <= 0)
- {
- _logger.LogError($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- _logger.LogInformation($"{nameof(WatermeterScheduledMeterFifteenMinuteReading)} 15分钟采集水表数据处理完成");
- }
- #endregion
-
-
- #region 公共处理方法
- ///
- /// 批量获取缓存的表计信息
- ///
- ///
- ///
- ///
- private async Task> GetMeterCacheData(string[] redisKeys)
- {
- //通过lua脚本一次性获取所有缓存内容
- var luaScript = @"
- local results = {}
- for i, key in ipairs(KEYS) do
- local data = redis.call('HGETALL', key)
- results[i] = {key, data}
- end
- return results";
- var oneMinuteAmmerterResult = await FreeRedisProvider.FreeRedis.EvalAsync(luaScript, redisKeys); //传递 KEYS
- if (oneMinuteAmmerterResult == null)
- {
- _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表数据处理时没有获取到缓存信息,-102");
- return null;
- }
-
- // 解析结果(结果为嵌套数组)
- List meterInfos = new List();
- if (oneMinuteAmmerterResult is object[] arr)
- {
- foreach (object[] item in arr)
- {
- string key = (string)item[0];
- object[] fieldsAndValues = (object[])item[1];
-
- for (int i = 0; i < fieldsAndValues.Length; i += 2)
- {
- string field = (string)fieldsAndValues[i];
- string valueStr = (string)fieldsAndValues[i + 1];
- T value = default;
- if (!string.IsNullOrWhiteSpace(valueStr))
- {
- value = valueStr.Deserialize()!;
- }
- if (value != null)
- {
- meterInfos.Add(value);
- }
- else
- {
- _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} 1分钟采集电表{key}数据{field}处理异常");
- }
- }
- }
- }
-
- return meterInfos;
- }
- #endregion
- }
-}
diff --git a/src/JiShe.CollectBus.Common/Enums/MeterLinkProtocolEnum.cs b/src/JiShe.CollectBus.Common/Enums/MeterLinkProtocolEnum.cs
new file mode 100644
index 0000000..637acf9
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/Enums/MeterLinkProtocolEnum.cs
@@ -0,0 +1,54 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Enums
+{
+ ///
+ /// 表计连接通讯协议--表计与集中器的通讯协议
+ ///
+ public enum MeterLinkProtocolEnum
+ {
+ ///
+ /// 无
+ ///
+ None = 0,
+
+ ///
+ /// DL/T 645—1997
+ ///
+ DLT_645_1997 = 1,
+
+ ///
+ /// 交流采样装置通信协议(电表)
+ ///
+ ACSamplingDevice = 2,
+
+ ///
+ /// DL/T 645—2007
+ ///
+ DLT_645_2007 = 30,
+
+ ///
+ /// 载波通信
+ ///
+ Carrierwave = 31,
+
+ ///
+ /// CJ/T 188—2018协议(水表)
+ ///
+ CJT_188_2018 = 32,
+
+ ///
+ /// CJ/T 188—2004协议
+ ///
+ CJT_188_2004 = 33,
+
+ ///
+ /// MODBUS-RTU
+ ///
+ MODBUS_RTU = 34,
+ }
+}
diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
index 7dce8ac..f62ba92 100644
--- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
+++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
@@ -87,7 +87,7 @@ namespace JiShe.CollectBus.Ammeters
public int TimeDensity { get; set; }
///
- /// 该电表方案下采集项,如:0D_80
+ /// 该电表方案下采集项,JSON格式,如:["0D_80","0D_80"]
///
public string ItemCodes { get; set; }
diff --git a/src/JiShe.CollectBus.Domain/GatherItem/GatherItemInfo.cs b/src/JiShe.CollectBus.Domain/GatherItem/GatherItemInfo.cs
new file mode 100644
index 0000000..da16012
--- /dev/null
+++ b/src/JiShe.CollectBus.Domain/GatherItem/GatherItemInfo.cs
@@ -0,0 +1,21 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.GatherItem
+{
+ public class GatherItemInfo
+ {
+ ///
+ /// 数据类型
+ ///
+ public string DataType { get; set; }
+
+ ///
+ /// 采集项编码
+ ///
+ public string ItemCode { get; set; }
+ }
+}
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index 3a0390a..56d3fca 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -224,7 +224,7 @@ namespace JiShe.CollectBus.Host
{
config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "10500"))
//.SetTcpDataHandlingAdapter(()=>new StandardFixedHeaderDataHandlingAdapter())
- //.SetGetDefaultNewId(() => Guid.NewGuid().ToString())//定义ClinetId的生成策略
+ //.SetGetDefaultNewId(() => Guid.NewGuid().ToString())//定义ClientId的生成策略
.ConfigurePlugins(a =>
{
a.Add();
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs
index ee2cddf..844a50b 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs
@@ -14,6 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
public const string SubscriberReceivedHeartbeatEventName = "received.heartbeat.event";
public const string SubscriberReceivedLoginEventName = "received.login.event";
+ #region 电表消息主题
///
/// 1分钟采集电表数据下行消息主题
///
@@ -27,6 +28,18 @@ namespace JiShe.CollectBus.Protocol.Contracts
///
public const string AmmeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.ammeter.event";
+ ///
+ /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、定时阀控等
+ ///
+ public const string AmmeterSubscriberWorkerOtherIssuedEventName = "issued.other.ammeter.event";
+
+ ///
+ /// 电表手动阀控
+ ///
+ public const string AmmeterSubscriberWorkerManualValveControlIssuedEventName = "issued.control.ammeter.event";
+ #endregion
+
+ #region 水表消息主题
///
/// 1分钟采集水表数据下行消息主题
///
@@ -40,5 +53,18 @@ namespace JiShe.CollectBus.Protocol.Contracts
///
public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteen.watermeter.event";
+ ///
+ /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、定时阀控等
+ ///
+ public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.other.watermeter.event";
+
+ ///
+ /// 水表手动阀控
+ ///
+ public const string WatermeterSubscriberWorkerManualValveControlIssuedEventName = "issued.control.watermeter.event";
+ #endregion
+
+
+
}
}