diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 5a665ff..33e70f9 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -14,12 +14,14 @@ using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Repository.MeterReadingRecord;
+using Mapster;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
+using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@@ -207,26 +209,30 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
-//#if DEBUG
-// var timeDensity = "15";
-// //获取缓存中的电表信息
-// var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*";
+#if DEBUG
+ var timeDensity = "15";
+ string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
+ //获取缓存中的电表信息
+ var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
-// var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
-// var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
-// List focusAddressDataLista = new List();
-// foreach (var item in meterInfos)
-// {
-// focusAddressDataLista.Add(item.FocusAddress);
-// }
-
-// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
-// return;
-//#else
-// var meterInfos = await GetAmmeterInfoList(gatherCode);
-//#endif
+ var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
+ var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
+ //List focusAddressDataLista = new List();
+ List meterInfos = new List();
+ foreach (var item in tempMeterInfos)
+ {
+ var tempData = item.Adapt();
+ tempData.FocusId = item.FocusID;
+ tempData.MeterId = item.Id;
+ meterInfos.Add(tempData);
+ //focusAddressDataLista.Add(item.FocusAddress);
+ }
+ //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
+#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
+#endif
+
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
@@ -238,6 +244,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
}
+ var timer = Stopwatch.StartNew();
List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。
@@ -245,6 +252,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
+ var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheFocusIndexKey = $"{string.Format(RedisConst.CacheMeterInfoFocusIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheGlobalIndexKey = $"{string.Format(RedisConst.CacheMeterInfoGlobalIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+
+ List ammeterInfos = new List();
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
@@ -256,17 +269,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
focusAddressDataList.Add(item.Key);
- var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
+ // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
- await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
+ //await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#endif
- Dictionary keyValuePairs = new Dictionary();
+ //Dictionary keyValuePairs = new Dictionary();
foreach (var ammeter in item)
{
//处理ItemCode
@@ -311,11 +324,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
- keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
+ ammeterInfos.Add(ammeter);
+ //keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
}
- await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
+ //await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
+ await FreeRedisProvider.BatchAddMeterData(
+ redisCacheKey,
+ redisCacheFocusIndexKey,
+ redisCacheScoresIndexKey,
+ redisCacheGlobalIndexKey, ammeterInfos);
+
//在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
@@ -338,7 +358,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList);
}
- _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
+ timer.Stop();
+
+ _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
}
///
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 07f5526..d44fe56 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -114,10 +114,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
- if (!string.IsNullOrWhiteSpace(gatherCode))
- {
- sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
- }
+ //if (!string.IsNullOrWhiteSpace(gatherCode))
+ //{
+ // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ //}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync(sql);
diff --git a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
index ea4323b..9377056 100644
--- a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
+++ b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
@@ -28,25 +28,44 @@ namespace JiShe.CollectBus.Common.Consts
///
public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen";
+ public const string MeterInfo = "MeterInfo";
///
/// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}:";
+ public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}";
+ ///
+ /// 缓存表计信息集中器索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ ///
+ public const string CacheMeterInfoFocusIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:FocusIndex:{"{3}"}";
+
+ ///
+ /// 缓存表计信息集中器排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ ///
+ public const string CacheMeterInfoScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ScoresIndex:{"{3}"}";
+
+ ///
+ /// 缓存表计信息集中器采集频率分组全局索引ZSet缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ ///
+ public const string CacheMeterInfoGlobalIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:GlobalIndex:{"{3}"}";
+
+
+ public const string TaskInfo = "TaskInfo";
///
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TaskInfo:{"{2}"}:{"{3}"}";
+ public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}{"{3}"}";
+ public const string TelemetryPacket = "TelemetryPacket";
///
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:";
+ public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}";
- ///
- /// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
- ///
- public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
+ /////
+ ///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
+ /////
+ //public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
}
diff --git a/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj b/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj
index b22e8ca..05645ef 100644
--- a/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj
+++ b/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj
@@ -17,6 +17,7 @@
+
diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfoTemp.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfoTemp.cs
new file mode 100644
index 0000000..fce33fa
--- /dev/null
+++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfoTemp.cs
@@ -0,0 +1,152 @@
+using JiShe.CollectBus.Common.Models;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Ammeters
+{
+ public class AmmeterInfoTemp
+ {
+ ///
+ /// 集中器Id
+ ///
+ public int FocusID { get; set; }
+
+ ///
+ /// 表Id
+ ///
+ public int Id { get; set; }
+
+ ///
+ /// 电表名称
+ ///
+ public string Name { get; set; }
+
+ ///
+ /// 集中器地址
+ ///
+ public string FocusAddress { get; set; }
+
+ ///
+ /// 集中器地址
+ ///
+ public string Address { get; set; }
+
+ ///
+ /// 集中器区域代码
+ ///
+ public string AreaCode { get; set; }
+
+ ///
+ /// 电表类别 (1单相、2三相三线、3三相四线),
+ /// 07协议: 开合闸指令(1A开闸断电,1C单相表合闸,1B多相表合闸) 645 2007 表
+ /// 97协议://true(合闸);false(跳闸) 545 1997 没有单相多相 之分 "true" ? "9966" : "3355"
+ ///
+ public int TypeName { get; set; }
+
+ ///
+ /// 跳合闸状态字段: 0 合闸,1 跳闸
+ /// 电表:TripState (0 合闸-通电, 1 断开、跳闸);
+ ///
+ public int TripState { get; set; }
+
+ ///
+ /// 规约 -电表default(30) 1:97协议,30:07协议
+ ///
+ public int? Protocol { get; set; }
+
+ ///
+ /// 一个集中器下的[MeteringCode]必须唯一。 PN
+ ///
+ public int MeteringCode { get; set; }
+
+ ///
+ /// 电表通信地址
+ ///
+ public string AmmerterAddress { get; set; }
+
+ ///
+ /// 波特率 default(2400)
+ ///
+ public int Baudrate { get; set; }
+
+ ///
+ /// MeteringPort 端口就几个可以枚举。
+ ///
+ public int MeteringPort { get; set; }
+
+ ///
+ /// 电表密码
+ ///
+ public string Password { get; set; }
+
+ ///
+ /// 采集时间间隔(分钟,如15)
+ ///
+ public int TimeDensity { get; set; }
+
+ ///
+ /// 该电表方案下采集项,JSON格式,如:["0D_80","0D_80"]
+ ///
+ public string ItemCodes { get; set; }
+
+ ///
+ /// State表状态:
+ /// 0新装(未下发),1运行(档案下发成功时设置状态值1), 2暂停, 100销表(销表后是否重新启用)
+ /// 特定:State -1 已删除
+ ///
+ public int State { get; set; }
+
+ ///
+ /// 是否自动采集(0:主动采集,1:自动采集)
+ ///
+ public int AutomaticReport { get; set; }
+
+ ///
+ /// 该电表方案下采集项编号
+ ///
+ public string DataTypes { get; set; }
+
+ ///
+ /// 品牌型号
+ ///
+ public string BrandType { get; set; }
+
+ ///
+ /// 采集器编号
+ ///
+ public string GatherCode { get; set; }
+
+ ///
+ /// 是否特殊表,1是特殊电表
+ ///
+ public int Special { get; set; }
+
+ ///
+ /// 费率类型,单、多 (SingleRate :单费率(单相表1),多费率(其他0) ,与TypeName字段无关)
+ /// SingleRate ? "单" : "复"
+ /// [SingleRate] --0 复费率 false , 1 单费率 true (与PayPlanID保持一致)
+ ///对应 TB_PayPlan.Type: 1复费率,2单费率
+ ///
+ public bool SingleRate { get; set; }
+
+ ///
+ /// 项目ID
+ ///
+ public int ProjectID { get; set; }
+
+ ///
+ /// 数据库业务ID
+ ///
+ public int DatabaseBusiID { get; set; }
+
+ ///
+ /// 是否异常集中器 0:正常,1异常
+ ///
+ public int AbnormalState { get; set; }
+
+ public DateTime LastTime { get; set; }
+ }
+}