diff --git a/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/FreeRedisProviderService.cs b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/FreeRedisProviderService.cs
new file mode 100644
index 0000000..2556803
--- /dev/null
+++ b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/FreeRedisProviderService.cs
@@ -0,0 +1,54 @@
+using FreeRedis;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Volo.Abp.DependencyInjection;
+
+namespace JiShe.CollectBus.FreeRedisProvider;
+
+public class FreeRedisProviderService : IFreeRedisProviderService, ISingletonDependency
+{
+ private FreeRedisOptions freeRedisOptions;
+
+ ///
+ /// FreeRedis
+ ///
+ public FreeRedisProviderService(IOptions options)
+ {
+ freeRedisOptions = options.Value;
+ }
+
+ public IRedisClient FreeRedis { get => GetClient(); }
+
+ ///
+ /// 获取 FreeRedis 客户端
+ ///
+ ///
+ public IRedisClient GetClient()
+ {
+
+ var redisClinet = new RedisClient(freeRedisOptions.ConnectionString);
+ redisClinet.Serialize = obj => JsonSerializer.Serialize(obj);
+ redisClinet.Deserialize = (json, type) => JsonSerializer.Deserialize(json, type);
+ redisClinet.Notice += (s, e) => Trace.WriteLine(e.Log);
+
+ return redisClinet;
+ }
+
+ ///
+ /// 切换Redis数据库
+ ///
+ ///
+ ///
+ public IRedisClient GetDatabase(int index = 0)
+ {
+ var redisClinet = GetClient();
+ redisClinet.GetDatabase(index);
+ return redisClinet;
+ }
+}
diff --git a/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/IFreeRedisProviderService.cs b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/IFreeRedisProviderService.cs
new file mode 100644
index 0000000..6f0ff63
--- /dev/null
+++ b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/IFreeRedisProviderService.cs
@@ -0,0 +1,29 @@
+using FreeRedis;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.FreeRedisProvider;
+
+public interface IFreeRedisProviderService
+{
+ ///
+ /// 默认客户端
+ ///
+ IRedisClient FreeRedis { get; }
+
+ ///
+ /// 获取客户端
+ ///
+ ///
+ IRedisClient GetClient();
+
+ ///
+ /// 切换Redis数据库
+ ///
+ ///
+ ///
+ IRedisClient GetDatabase(int index = 0);
+}
diff --git a/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/Options/FreeRedisOptions.cs b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/Options/FreeRedisOptions.cs
new file mode 100644
index 0000000..266c743
--- /dev/null
+++ b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider/Options/FreeRedisOptions.cs
@@ -0,0 +1,25 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.FreeRedisProvider;
+
+public class FreeRedisOptions
+{
+ ///
+ /// 连接字符串
+ ///
+ public string? ConnectionString { get; set; }
+
+ ///
+ /// 默认数据库
+ ///
+ public string? DefaultDB { get; set; }
+
+ ///
+ /// HangfireDB
+ ///
+ public string? HangfireDB { get; set; }
+}
\ No newline at end of file
diff --git a/JiShe.CollectBus.FreeRedisProvider/FreeRedisProviderModule.cs b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProviderModule.cs
new file mode 100644
index 0000000..2bf7349
--- /dev/null
+++ b/JiShe.CollectBus.FreeRedisProvider/FreeRedisProviderModule.cs
@@ -0,0 +1,26 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Volo.Abp.Modularity;
+
+namespace JiShe.CollectBus.FreeRedisProvider;
+
+public class FreeRedisProviderModule : AbpModule
+{
+ public override void ConfigureServices(ServiceConfigurationContext context)
+ {
+ var configuration = context.Services.GetConfiguration();
+
+ Configure(options =>
+ {
+ configuration.GetSection("Redis").Bind(options);
+ });
+
+
+ }
+}
+
diff --git a/JiShe.CollectBus.FreeRedisProvider/JiShe.CollectBus.FreeRedisProvider.csproj b/JiShe.CollectBus.FreeRedisProvider/JiShe.CollectBus.FreeRedisProvider.csproj
new file mode 100644
index 0000000..e4f3447
--- /dev/null
+++ b/JiShe.CollectBus.FreeRedisProvider/JiShe.CollectBus.FreeRedisProvider.csproj
@@ -0,0 +1,12 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index 5a41cab..822d02b 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -29,6 +29,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.DbMigrator
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", "src\JiShe.CollectBus.FreeSql\JiShe.CollectBus.FreeSql.csproj", "{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{920445D9-18D2-4886-9053-6A4CC3B4F3E2}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -83,6 +85,10 @@ Global
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {920445D9-18D2-4886-9053-6A4CC3B4F3E2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {920445D9-18D2-4886-9053-6A4CC3B4F3E2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {920445D9-18D2-4886-9053-6A4CC3B4F3E2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {920445D9-18D2-4886-9053-6A4CC3B4F3E2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -100,6 +106,7 @@ Global
{38C1808B-009A-418B-B17B-AB3626341B5D} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
+ {920445D9-18D2-4886-9053-6A4CC3B4F3E2} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
diff --git a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
index 0a28849..6841779 100644
--- a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
+++ b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj
@@ -22,4 +22,8 @@
+
+
+
+
diff --git a/src/JiShe.CollectBus.Application.Contracts/Workers/DTO/Energy/EnergyAmmeterInfoDto.cs b/src/JiShe.CollectBus.Application.Contracts/Workers/DTO/Energy/EnergyAmmeterInfoDto.cs
new file mode 100644
index 0000000..7836840
--- /dev/null
+++ b/src/JiShe.CollectBus.Application.Contracts/Workers/DTO/Energy/EnergyAmmeterInfoDto.cs
@@ -0,0 +1,146 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Workers.DTO.Energy
+{
+ ///
+ /// 能耗电表信息数据模型
+ ///
+ public class EnergyAmmeterInfoDto
+ {
+ ///
+ /// 电表ID
+ ///
+ public int ID { get; set; }
+ ///
+ /// 电表名称
+ ///
+ public string Name { get; set; }
+
+ ///
+ /// 集中器ID
+ ///
+ public int FocusID { get; set; }
+
+ ///
+ /// 集中器地址
+ ///
+ public string Address { get; set; }
+
+ ///
+ /// 集中器区域代码
+ ///
+ public string AreaCode { get; set; }
+
+ ///
+ /// 电表类别 (1单相、2三相三线、3三相四线),
+ /// 07协议: 开合闸指令(1A开闸断电,1C单相表合闸,1B多相表合闸) 645 2007 表
+ /// 97协议://true(合闸);false(跳闸) 545 1997 没有单相多相 之分 "true" ? "9966" : "3355"
+ ///
+ public int TypeName { get; set; }
+ ///
+ /// 跳合闸状态字段: 0 合闸,1 跳闸
+ /// 电表:TripState (0 合闸-通电, 1 断开、跳闸);
+ ///
+ public int TripState { get; set; }
+
+ ///
+ /// 规约 -电表default(30) 1:97协议,30:07协议
+ ///
+ public int? Protocol { get; set; }
+
+ ///
+ /// 一个集中器下的[MeteringCode]必须唯一。 PN
+ ///
+ public int MeteringCode { get; set; }
+
+ ///
+ /// 电表通信地址
+ ///
+ public string AmmerterAddress { get; set; }
+
+ ///
+ /// 波特率 default(2400)
+ ///
+ public int Baudrate { get; set; }
+
+ ///
+ /// MeteringPort 端口就几个可以枚举。
+ ///
+ public int MeteringPort { get; set; }
+
+ ///
+ /// 电表密码
+ ///
+ public string Password { get; set; }
+
+ ///
+ /// 采集时间间隔(分钟,如15)
+ ///
+ public int TimeDensity { get; set; }
+
+ ///
+ /// 该电表方案下采集项,如:0D_80
+ ///
+ public string ItemCodes { get; set; }
+
+ ///
+ /// State表状态:
+ /// 0新装(未下发),1运行(档案下发成功时设置状态值1), 2暂停, 100销表(销表后是否重新启用)
+ /// 特定:State -1 已删除
+ ///
+ public int State { get; set; }
+
+ ///
+ /// 是否自动采集(0:主动采集,1:自动采集)
+ ///
+ public int AutomaticReport { get; set; }
+
+ ///
+ /// 该电表方案下采集项编号
+ ///
+ public string DataTypes { get; set; }
+
+ ///
+ /// 品牌型号
+ ///
+ public string BrandType { get; set; }
+
+ ///
+ /// 采集器编号
+ ///
+ public string GatherCode { get; set; }
+
+ ///
+ /// 是否特殊表
+ ///
+ public int Special { get; set; }
+
+ ///
+ /// 费率类型,单、多 (SingleRate :单费率(单相表1),多费率(其他0) ,与TypeName字段无关)
+ /// SingleRate ? "单" : "复"
+ /// [SingleRate] --0 复费率 false , 1 单费率 true (与PayPlanID保持一致)
+ ///对应 TB_PayPlan.Type: 1复费率,2单费率
+ ///
+ public bool SingleRate { get; set; }
+
+ ///
+ /// 项目ID
+ ///
+ public int ProjectID { get; set; }
+ ///
+ /// 是否异常集中器 0:正常,1异常
+ ///
+ public int AbnormalState { get; set; }
+
+ public DateTime LastTime { get; set; }
+
+ ///
+ /// 集中器地址
+ ///
+ public string FocusAddress { get; set; }
+ }
+}
diff --git a/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerScheduledService.cs b/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerScheduledService.cs
new file mode 100644
index 0000000..6ec0577
--- /dev/null
+++ b/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerScheduledService.cs
@@ -0,0 +1,45 @@
+using JiShe.CollectBus.Ammeters;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Workers
+{
+ ///
+ /// 定时任务基础约束
+ ///
+ interface IWorkerScheduledService
+ {
+ ///
+ /// 获取电表信息
+ ///
+ ///
+ Task> GetAmmeterInfoList();
+
+ ///
+ /// 初始化电表缓存数据
+ ///
+ ///
+ Task InitAmmeterCacheData();
+
+ ///
+ /// 1分钟采集电表数据
+ ///
+ ///
+ Task ScheduledMeterOneMinuteReading();
+
+ ///
+ /// 5分钟采集电表数据
+ ///
+ ///
+ Task ScheduledMeterFiveMinuteReading();
+
+ ///
+ /// 15分钟采集电表数据
+ ///
+ ///
+ Task ScheduledMeterFifteenMinuteReading();
+ }
+}
diff --git a/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerSubscriberAppService.cs
new file mode 100644
index 0000000..c7442de
--- /dev/null
+++ b/src/JiShe.CollectBus.Application.Contracts/Workers/IWorkerSubscriberAppService.cs
@@ -0,0 +1,31 @@
+using System.Threading.Tasks;
+using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.MessageReceiveds;
+using Volo.Abp.Application.Services;
+
+namespace JiShe.CollectBus.Subscribers
+{
+ ///
+ /// 定时抄读任务消息订阅
+ ///
+ public interface IWorkerSubscriberAppService : IApplicationService
+ {
+ ///
+ /// 1分钟采集电表数据下行消息消费订阅
+ ///
+ ///
+ Task ScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
+
+ ///
+ /// 5分钟采集电表数据下行消息消费订阅
+ ///
+ ///
+ Task ScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
+
+ ///
+ /// 15分钟采集电表数据下行消息消费订阅
+ ///
+ ///
+ Task ScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/CollectBusAppService.cs b/src/JiShe.CollectBus.Application/CollectBusAppService.cs
index 4a45327..c607ec4 100644
--- a/src/JiShe.CollectBus.Application/CollectBusAppService.cs
+++ b/src/JiShe.CollectBus.Application/CollectBusAppService.cs
@@ -1,6 +1,12 @@
-using JiShe.CollectBus.FreeSql;
+using FreeRedis;
+using FreeSql;
+using JiShe.CollectBus.FreeRedisProvider;
+using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Localization;
+using JiShe.CollectBus.Workers.DTO.Energy;
using Microsoft.AspNetCore.Mvc;
+using System.Collections.Generic;
+using System.Threading.Tasks;
using Volo.Abp.Application.Services;
namespace JiShe.CollectBus;
@@ -9,10 +15,30 @@ namespace JiShe.CollectBus;
public abstract class CollectBusAppService : ApplicationService
{
public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService();
+ protected IRedisClient? FreeRedis => LazyServiceProvider.LazyGetService()?.FreeRedis;
protected CollectBusAppService()
{
LocalizationResource = typeof(CollectBusResource);
ObjectMapperContext = typeof(CollectBusApplicationModule);
}
+
+
+ #region 能耗相关
+ ///
+ /// 查找当前采集器下所有电表
+ ///
+ /// 采集端Code
+ ///
+ protected async Task> GetAmmetersByGatherCode(string gatherCode = "V4-Gather-8890")
+ {
+ string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime
+ FROM TB_GatherInfo(NOLOCK) AS A
+ INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
+ INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
+ INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
+ WHERE A.GatherCode = {gatherCode}";
+ return await SqlProvider.Instance.Change(DbEnum.EnergyDB).Select(sql).ToListAsync();
+ }
+ #endregion
}
diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
index ab38baa..694f807 100644
--- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
+++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
@@ -10,6 +10,7 @@ using System.Reflection;
using JiShe.CollectBus.FreeSql;
using System;
using Volo.Abp.AspNetCore.Mvc.AntiForgery;
+using JiShe.CollectBus.FreeRedisProvider;
namespace JiShe.CollectBus;
@@ -19,6 +20,7 @@ namespace JiShe.CollectBus;
typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule),
typeof(AbpBackgroundWorkersModule),
+ typeof(FreeRedisProviderModule),
typeof(CollectBusFreeSqlModule)
)]
public class CollectBusApplicationModule : AbpModule
diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
index 8bd63d9..7160628 100644
--- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
+++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
@@ -23,6 +23,7 @@
+
diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
index b40273a..1c5a6b6 100644
--- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
@@ -53,14 +53,17 @@ namespace JiShe.CollectBus.Plugins
var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN);
var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN);
var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
- if (aFn.HasValue && fn.HasValue)
+ if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1))
{
+ string oldClinetId = client.Id;
+ await client.ResetIdAsync(aTuple.Item1);
+
if ((AFN)aFn == AFN.链路接口检测)
{
switch (fn)
{
case 1:
- await OnTcpLoginReceived(client, messageHexString, aTuple.Item1);
+ await OnTcpLoginReceived(client, messageHexString, aTuple.Item1, oldClinetId);
break;
case 3:
await OnTcpHeartbeatReceived(client, messageHexString, aTuple.Item1);
@@ -113,7 +116,16 @@ namespace JiShe.CollectBus.Plugins
await e.InvokeNext();
}
- private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
+
+ ///
+ /// 登录帧处理
+ ///
+ ///
+ ///
+ /// 集中器编号
+ /// TCP首次连接时的Id
+ ///
+ private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo,string oldClinetId)
{
var messageReceivedLoginEvent = new MessageReceivedLogin
{
diff --git a/src/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs b/src/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs
index bbbee85..bc1badc 100644
--- a/src/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs
+++ b/src/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs
@@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Workers
public override Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
{
- _logger.LogInformation("Executed MyLogWorker..!");
+ _logger.LogError("Executed MyLogWorker..!");
return Task.CompletedTask;
}
}
diff --git a/src/JiShe.CollectBus.Application/Workers/ScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/Workers/ScheduledMeterReadingService.cs
new file mode 100644
index 0000000..9673490
--- /dev/null
+++ b/src/JiShe.CollectBus.Application/Workers/ScheduledMeterReadingService.cs
@@ -0,0 +1,27 @@
+using FreeRedis;
+using JiShe.CollectBus.FreeRedisProvider;
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Volo.Abp.DependencyInjection;
+
+namespace JiShe.CollectBus.Workers
+{
+ ///
+ /// 定时采集服务
+ ///
+ public class ScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
+ {
+ ///
+ /// 初始化能耗电表数据
+ ///
+ ///
+ public async Task InitEnergyAmmeterData()
+ {
+ var ammerterList = await GetAmmetersByGatherCode();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Application/Workers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Workers/WorkerSubscriberAppService.cs
new file mode 100644
index 0000000..1ddaeea
--- /dev/null
+++ b/src/JiShe.CollectBus.Application/Workers/WorkerSubscriberAppService.cs
@@ -0,0 +1,103 @@
+using System;
+using System.Threading.Tasks;
+using DeviceDetectorNET.Parser.Device;
+using DotNetCore.CAP;
+using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.Devices;
+using JiShe.CollectBus.MessageReceiveds;
+using JiShe.CollectBus.Protocol.Contracts;
+using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using TouchSocket.Sockets;
+using Volo.Abp.Caching;
+using Volo.Abp.Domain.Repositories;
+
+namespace JiShe.CollectBus.Subscribers
+{
+ ///
+ /// 定时抄读任务消息消费订阅
+ ///
+ public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe
+ {
+ private readonly ILogger _logger;
+ private readonly ITcpService _tcpService;
+ private readonly IServiceProvider _serviceProvider;
+
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The logger.
+ /// The TCP service.
+ /// The service provider.
+ public WorkerSubscriberAppService(ILogger logger,
+ ITcpService tcpService, IServiceProvider serviceProvider)
+ {
+ _logger = logger;
+ _tcpService = tcpService;
+ _serviceProvider = serviceProvider;
+ }
+
+ ///
+ /// 一分钟定时抄读任务消息消费订阅
+ ///
+ ///
+ ///
+ [CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)]
+ public async Task ScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
+ {
+ _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
+ var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
+ }
+ else
+ {
+ await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message);
+ }
+ }
+
+ ///
+ /// 5分钟采集电表数据下行消息消费订阅
+ ///
+ ///
+ ///
+ [CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)]
+ public async Task ScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
+ {
+ _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
+ var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
+ }
+ else
+ {
+ await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message);
+ }
+ }
+
+ ///
+ /// 15分钟采集电表数据下行消息消费订阅
+ ///
+ ///
+ ///
+ [CapSubscribe(ProtocolConst.SubscriberWorkerOneMinuteIssuedEventName)]
+ public async Task ScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
+ {
+ _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
+ var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
+ }
+ else
+ {
+ await _tcpService.SendAsync(receivedMessage.ClientId, receivedMessage.Message);
+ }
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Common/Consts/FreeRedisConst.cs b/src/JiShe.CollectBus.Common/Consts/FreeRedisConst.cs
new file mode 100644
index 0000000..96661d8
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/Consts/FreeRedisConst.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Consts
+{
+ public class FreeRedisConst
+ {
+ ///
+ /// 缓存基础目录
+ ///
+ public const string CacheBasicDirectoryKey = "CollectBus:";
+ ///
+ /// 缓存电表信息
+ ///
+ public const string CacheAmmeterInfoKey = $"{CacheBasicDirectoryKey}AmmeterInfo:";
+ }
+}
diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
index e9bc4b5..aabbd32 100644
--- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
+++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
@@ -8,80 +8,115 @@ namespace JiShe.CollectBus.Ammeters
{
public class AmmeterInfo
{
+ ///
+ /// 电表ID
+ ///
public int ID { get; set; }
+ ///
+ /// 电表名称
+ ///
public string Name { get; set; }
+
+ ///
+ /// 集中器ID
+ ///
public int FocusID { get; set; }
+
+ ///
+ /// 集中器地址
+ ///
public string Address { get; set; }
+
+ ///
+ /// 集中器区域代码
+ ///
public string AreaCode { get; set; }
+
///
/// 电表类别 (1单相、2三相三线、3三相四线),
/// 07协议: 开合闸指令(1A开闸断电,1C单相表合闸,1B多相表合闸) 645 2007 表
/// 97协议://true(合闸);false(跳闸) 545 1997 没有单相多相 之分 "true" ? "9966" : "3355"
///
public int TypeName { get; set; }
+
///
/// 跳合闸状态字段: 0 合闸,1 跳闸
/// 电表:TripState (0 合闸-通电, 1 断开、跳闸);
///
public int TripState { get; set; }
+
///
/// 规约 -电表default(30) 1:97协议,30:07协议
///
public int? Protocol { get; set; }
+
///
/// 一个集中器下的[MeteringCode]必须唯一。 PN
///
public int MeteringCode { get; set; }
+
///
/// 电表通信地址
///
public string AmmerterAddress { get; set; }
+
///
/// 波特率 default(2400)
///
public int Baudrate { get; set; }
+
///
/// MeteringPort 端口就几个可以枚举。
///
public int MeteringPort { get; set; }
+
///
/// 电表密码
///
public string Password { get; set; }
+
///
/// 采集时间间隔(分钟,如15)
///
public int TimeDensity { get; set; }
+
///
/// 该电表方案下采集项,如:0D_80
///
public string ItemCodes { get; set; }
+
///
/// State表状态:
/// 0新装(未下发),1运行(档案下发成功时设置状态值1), 2暂停, 100销表(销表后是否重新启用)
/// 特定:State -1 已删除
///
public int State { get; set; }
+
///
/// 是否自动采集(0:主动采集,1:自动采集)
///
public int AutomaticReport { get; set; }
+
///
/// 该电表方案下采集项编号
///
public string DataTypes { get; set; }
+
///
/// 品牌型号
///
public string BrandType { get; set; }
+
///
/// 采集器编号
///
public string GatherCode { get; set; }
+
///
/// 是否特殊表
///
public int Special { get; set; }
+
///
/// 费率类型,单、多 (SingleRate :单费率(单相表1),多费率(其他0) ,与TypeName字段无关)
/// SingleRate ? "单" : "复"
@@ -89,11 +124,21 @@ namespace JiShe.CollectBus.Ammeters
///对应 TB_PayPlan.Type: 1复费率,2单费率
///
public bool SingleRate { get; set; }
+
+ ///
+ /// 项目ID
+ ///
public int ProjectID { get; set; }
///
/// 是否异常集中器 0:正常,1异常
///
public int AbnormalState { get; set; }
+
public DateTime LastTime { get; set; }
+
+ ///
+ /// 集中器地址
+ ///
+ public string FocusAddress { get; set; }
}
}
diff --git a/src/JiShe.CollectBus.Domain/Devices/Device.cs b/src/JiShe.CollectBus.Domain/Devices/Device.cs
index 8dcf886..4fb015a 100644
--- a/src/JiShe.CollectBus.Domain/Devices/Device.cs
+++ b/src/JiShe.CollectBus.Domain/Devices/Device.cs
@@ -27,16 +27,34 @@ namespace JiShe.CollectBus.Devices
Status = status;
}
+ ///
+ /// 集中器编号,在集中器登录时解析获取,并会更新为当前TCP连接的最新ClientId
+ ///
public string Number { get; set; }
+ ///
+ /// 首次上线时间
+ ///
public DateTime FirstOnlineTime { get; set; }
+ ///
+ /// 最后上线时间
+ ///
public DateTime LastOnlineTime { get; set; }
+ ///
+ /// TCP客户端首次连接ID,在登录解析成功以后会被Number集中器编号覆盖
+ ///
public string ClientId { get; set; }
+ ///
+ /// TCP客户端断线时间,用于计算是否断线
+ ///
public DateTime? LastOfflineTime { get; set; }
+ ///
+ /// 设备状态
+ ///
public DeviceStatus Status { get; set; }
public void UpdateByLoginAndHeartbeat(string clientId)
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index 1f306f8..699ba43 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -222,8 +222,9 @@ namespace JiShe.CollectBus.Host
{
context.Services.AddTcpService(config =>
{
- config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "10500"))
+ config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "32580"))
//.SetTcpDataHandlingAdapter(()=>new StandardFixedHeaderDataHandlingAdapter())
+ .SetGetDefaultNewId(() => Guid.NewGuid().ToString())//定义ClinetId的生成策略
.ConfigurePlugins(a =>
{
a.Add();
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs
index 25b160c..431fc4c 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs
@@ -14,5 +14,18 @@ namespace JiShe.CollectBus.Protocol.Contracts
public const string SubscriberReceivedHeartbeatEventName = "received.heartbeat.event";
public const string SubscriberReceivedLoginEventName = "received.login.event";
+ ///
+ /// 1分钟采集电表数据下行消息主题
+ ///
+ public const string SubscriberWorkerOneMinuteIssuedEventName = "issued.oneminute.event";
+ ///
+ /// 5分钟采集电表数据下行消息主题
+ ///
+ public const string SubscriberWorkerFiveMinuteIssuedEventName = "issued.fiveminute.event";
+ ///
+ /// 15分钟采集电表数据下行消息主题
+ ///
+ public const string SubscriberWorkerFifteenMinuteIssuedEventName = "issued.fifteenminute.event";
+
}
}