From 9da1745573270a2a145c4300f085321c17a327d8 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Mon, 24 Mar 2025 20:54:31 +0800
Subject: [PATCH 01/18] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
...er.cs => ScheduledMeterReadingConsumer.cs} | 12 ++--
.../JiShe.CollectBus.Application.csproj | 3 +-
.../BasicScheduledMeterReadingService.cs | 7 ++-
...nergySystemScheduledMeterReadingService.cs | 62 ++++++++++++++-----
.../CollectBusHostModule.Configure.cs | 10 +++
.../CollectBusHostModule.cs | 2 +-
6 files changed, 71 insertions(+), 25 deletions(-)
rename src/JiShe.CollectBus.Application/Consumers/{WorkerConsumer.cs => ScheduledMeterReadingConsumer.cs} (52%)
diff --git a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs
similarity index 52%
rename from src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs
rename to src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs
index 53ad836..cdc731a 100644
--- a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs
+++ b/src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs
@@ -2,6 +2,7 @@
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.IotSystems.MessageIssueds;
using MassTransit;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
@@ -12,9 +13,9 @@ namespace JiShe.CollectBus.Consumers
///
/// 定时抄读任务消费者
///
- public class WorkerConsumer : IConsumer
+ public class ScheduledMeterReadingConsumer : IConsumer
{
- private readonly ILogger _logger;
+ private readonly ILogger _logger;
private readonly ITcpService _tcpService;
///
@@ -22,7 +23,7 @@ namespace JiShe.CollectBus.Consumers
///
///
///
- public WorkerConsumer(ILogger logger,
+ public ScheduledMeterReadingConsumer(ILogger logger,
ITcpService tcpService)
{
_logger = logger;
@@ -30,9 +31,10 @@ namespace JiShe.CollectBus.Consumers
}
- public async Task Consume(ConsumeContext context)
+ public async Task Consume(ConsumeContext context)
{
- await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
+ _logger.LogError($"{nameof(ScheduledMeterReadingConsumer)} 集中器的消息消费{context.Message.FocusAddress}");
+ await _tcpService.SendAsync(context.Message.FocusAddress, context.Message.MessageHexString);
}
}
}
diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
index 94c4faf..078728a 100644
--- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
+++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
@@ -15,7 +15,8 @@
-
+
+
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index bb3c743..14e0a6a 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -26,6 +26,7 @@ using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Workers;
using MassTransit;
using MassTransit.Internals.GraphValidation;
+using MassTransit.Transports;
using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
using static FreeSql.Internal.GlobalFilter;
@@ -41,10 +42,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly ICapPublisher _capBus;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
-
public BasicScheduledMeterReadingService(
ILogger logger,
- ICapPublisher capBus,
+ ICapPublisher capBus,
IMeterReadingRecordRepository meterReadingRecordsRepository)
{
_capBus = capBus;
@@ -432,8 +432,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
+
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ //await _massTransitBus.Publish(tempMsg);
+
meterTaskInfosList.Add(ammerterItem.Value);
}
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 24507fe..61cc2e5 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -12,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
+using MassTransit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
@@ -27,10 +28,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{
- public EnergySystemScheduledMeterReadingService(ILogger logger,
- ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository)
+ public EnergySystemScheduledMeterReadingService(ILogger logger,
+ ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, capBus, meterReadingRecordsRepository)
{
-
+
}
public sealed override string SystemType => SystemTypeConst.Energy;
@@ -63,21 +64,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"ammeter/list")]
public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
- string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
- FROM TB_GatherInfo(NOLOCK) AS A
- INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
- INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
- INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
- WHERE 1=1 and C.Special = 0 ";
- //TODO 记得移除特殊表过滤
- if (!string.IsNullOrWhiteSpace(gatherCode))
+ List ammeterInfos = new List();
+ ammeterInfos.Add(new AmmeterInfo()
{
- sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
- }
- return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
- .Ado
- .QueryAsync(sql);
+ Baudrate = 2400,
+ FocusAddress = "402440506",
+ Name = "三相电表",
+ FocusID = 1,
+ DatabaseBusiID = 1,
+ MeteringCode = 2,
+ AmmerterAddress = "402410040506",
+ ID = 9980,
+ TypeName = 3,
+ });
+ ammeterInfos.Add(new AmmeterInfo()
+ {
+ Baudrate = 2400,
+ FocusAddress = "542400504",
+ Name = "单相电表",
+ FocusID = 1,
+ DatabaseBusiID = 1,
+ MeteringCode = 2,
+ AmmerterAddress = "542410000504",
+ ID = 9981,
+ TypeName = 1,
+ });
+
+ return ammeterInfos;
+
+ //string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
+ // FROM TB_GatherInfo(NOLOCK) AS A
+ // INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
+ // INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
+ // INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
+ // WHERE 1=1 and C.Special = 0 ";
+ ////TODO 记得移除特殊表过滤
+
+ //if (!string.IsNullOrWhiteSpace(gatherCode))
+ //{
+ // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ //}
+ //return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
+ // .Ado
+ // .QueryAsync(sql);
}
///
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index 6ffa1d0..374eaa5 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -20,6 +20,7 @@ using JiShe.CollectBus.Plugins;
using JiShe.CollectBus.Consumers;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
+using JiShe.CollectBus.IotSystems.MessageIssueds;
namespace JiShe.CollectBus.Host
@@ -302,6 +303,9 @@ namespace JiShe.CollectBus.Host
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.SetConcurrencyLimit(10));
});
+
+ rider.AddConsumer();
+
rider.UsingKafka((c, cfg) =>
{
cfg.Host(configuration.GetConnectionString("Kafka"));
@@ -329,6 +333,12 @@ namespace JiShe.CollectBus.Host
configurator.ConfigureConsumer(c);
configurator.ConfigureConsumeTopology = false;
});
+
+ //cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ //{
+ // configurator.ConfigureConsumer(c);
+ // configurator.ConfigureConsumeTopology = false;
+ //});
});
});
});
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
index 933641b..6f24b1e 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Host
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule),
typeof(CollectBusApplicationModule),
- typeof(CollectBusMongoDbModule),
+ typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule),
typeof(AbpBackgroundWorkersHangfireModule)
)]
From af9b0d8a77b08857014ae1ba419b27208eabf555 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=99=88=E7=9B=8A?=
Date: Mon, 24 Mar 2025 21:55:22 +0800
Subject: [PATCH 02/18] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=A1=A8=E5=88=86?=
=?UTF-8?q?=E7=89=87=E7=AD=96=E7=95=A5=E7=BB=9F=E4=B8=80=E5=87=BA=E5=8F=A3?=
=?UTF-8?q?=E5=B0=81=E8=A3=85?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Subscribers/SubscriberAppService.cs | 53 +++++++++++++++----
.../Extensions/DateTimeExtensions.cs | 10 ++++
.../ShardingStrategy/DayShardingStrategy.cs | 9 ++--
.../Abstracts/BaseProtocolPlugin.cs | 2 +-
.../Interfaces/IProtocolPlugin.cs | 2 +-
5 files changed, 59 insertions(+), 17 deletions(-)
diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
index fc05bc7..7dbe586 100644
--- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
@@ -1,7 +1,10 @@
using System;
+using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
@@ -9,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
+using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
@@ -16,16 +20,16 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
- public class SubscriberAppService : CollectBusAppService, ISubscriberAppService,ICapSubscribe
+ public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe
{
private readonly ILogger _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IRepository _messageReceivedLoginEventRepository;
private readonly IRepository _messageReceivedHeartbeatEventRepository;
- private readonly IRepository _messageReceivedEventRepository;
+ private readonly IRepository _messageReceivedEventRepository;
private readonly IRepository _deviceRepository;
- private readonly IRepository _meterReadingRecordsRepository;
+ private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
///
/// Initializes a new instance of the class.
@@ -38,12 +42,12 @@ namespace JiShe.CollectBus.Subscribers
/// The message received event repository.
/// The device repository.
/// The device repository.
- public SubscriberAppService(ILogger logger,
- ITcpService tcpService, IServiceProvider serviceProvider,
- IRepository messageReceivedLoginEventRepository,
- IRepository messageReceivedHeartbeatEventRepository,
- IRepository messageReceivedEventRepository,
- IRepository deviceRepository, IRepository meterReadingRecordsRepository)
+ public SubscriberAppService(ILogger logger,
+ ITcpService tcpService, IServiceProvider serviceProvider,
+ IRepository messageReceivedLoginEventRepository,
+ IRepository messageReceivedHeartbeatEventRepository,
+ IRepository messageReceivedEventRepository,
+ IRepository deviceRepository, IMeterReadingRecordRepository meterReadingRecordsRepository)
{
_logger = logger;
_tcpService = tcpService;
@@ -79,7 +83,7 @@ namespace JiShe.CollectBus.Subscribers
throw new ArgumentOutOfRangeException();
}
var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
- if (device!=null)
+ if (device != null)
{
await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
}
@@ -96,7 +100,34 @@ namespace JiShe.CollectBus.Subscribers
else
{
//todo 会根据不同的协议进行解析,然后做业务处理
- TB3761FN fN = await protocolPlugin.AnalyzeAsync(receivedMessage);
+ TB3761 fN = await protocolPlugin.AnalyzeAsync(receivedMessage);
+ if(fN == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return;
+ }
+ var tb3761FN = fN.FnList.FirstOrDefault();
+ if (tb3761FN == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return;
+ }
+
+ //todo 查找是否有下发任务
+
+
+
+ await _meterReadingRecordsRepository.InsertAsync(new MeterReadingRecords()
+ {
+ ReceivedMessageHexString = receivedMessage.MessageHexString,
+ AFN = fN.Afn,
+ Fn = tb3761FN.Fn,
+ Pn = 0,
+ FocusAddress = "",
+ MeterAddress = "",
+ DataResult = tb3761FN.Text,
+ });
+
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
}
diff --git a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs
index b12778f..904d4b9 100644
--- a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs
+++ b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs
@@ -167,5 +167,15 @@ namespace JiShe.CollectBus.Common.Extensions
)
);
}
+
+ ///
+ /// 获取数据表分片策略
+ ///
+ ///
+ ///
+ public static string GetDataTableShardingStrategy(this DateTime dateTime)
+ {
+ return $"{dateTime:yyyyMMddHHmm}";
+ }
}
}
diff --git a/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs
index 75157e5..f26136d 100644
--- a/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs
+++ b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs
@@ -1,4 +1,5 @@
-using System;
+using JiShe.CollectBus.Common.Extensions;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -21,7 +22,7 @@ namespace JiShe.CollectBus.ShardingStrategy
public string GetCollectionName(DateTime dateTime)
{
var baseName = typeof(TEntity).Name;
- return $"{baseName}_{dateTime:yyyyMMddHHmm}";
+ return $"{baseName}_{dateTime.GetDataTableShardingStrategy()}";
}
///
@@ -31,7 +32,7 @@ namespace JiShe.CollectBus.ShardingStrategy
public string GetCurrentCollectionName()
{
var baseName = typeof(TEntity).Name;
- return $"{baseName}_{DateTime.Now:yyyyMMddHHmm}";
+ return $"{baseName}_{DateTime.Now.GetDataTableShardingStrategy()}";
}
///
@@ -49,7 +50,7 @@ namespace JiShe.CollectBus.ShardingStrategy
while (current <= end)
{
- months.Add($"{baseName}_{current:yyyyMMddHHmm}");
+ months.Add($"{baseName}_{current.GetDataTableShardingStrategy()}");
current = current.AddMonths(1);
}
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index 609dcdb..4638983 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
//await _protocolInfoCache.Get()
}
- public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN;
+ public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761;
///
/// 登录帧解析
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index 5ad92c1..2f48cd2 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
Task AddAsync();
- Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN;
+ Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761;
Task LoginAsync(MessageReceivedLogin messageReceived);
From a9b8323a3ad453658cb3a4f2220d025947bf3330 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Thu, 27 Mar 2025 08:38:19 +0800
Subject: [PATCH 03/18] =?UTF-8?q?=E6=9A=82=E5=AD=98=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Consumers/ReceivedConsumer.cs | 2 +-
.../JiShe.CollectBus.Application.csproj | 6 +-
.../Plugins/CloseMonitor.cs | 14 +--
.../Plugins/ServerMonitor.cs | 4 +-
.../Plugins/TcpMonitor.cs | 114 +++++++++++-------
.../Plugins/UdpMonitor.cs | 3 +-
.../BasicScheduledMeterReadingService.cs | 2 +-
...nergySystemScheduledMeterReadingService.cs | 46 +++----
.../Subscribers/SubscriberAppService.cs | 16 ++-
.../IotSystems/Devices/Device.cs | 6 +
.../JiShe.CollectBus.Host.csproj | 4 +-
src/JiShe.CollectBus.Host/appsettings.json | 6 +-
.../Abstracts/BaseProtocolPlugin.cs | 8 +-
13 files changed, 138 insertions(+), 93 deletions(-)
diff --git a/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs
index 9309c55..4f7b5eb 100644
--- a/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs
+++ b/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs
@@ -49,7 +49,7 @@ namespace JiShe.CollectBus.Consumers
var list = new List();
foreach (var contextItem in context.Message)
{
- await protocolPlugin.AnalyzeAsync(contextItem.Message);
+ await protocolPlugin.AnalyzeAsync(contextItem.Message);
list.Add(contextItem.Message);
}
await _messageReceivedEventRepository.InsertManyAsync(list);
diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
index 078728a..6b71b26 100644
--- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
+++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
@@ -15,14 +15,14 @@
-
+
-
-
+
+
diff --git a/src/JiShe.CollectBus.Application/Plugins/CloseMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/CloseMonitor.cs
index 19b66d8..f8651c7 100644
--- a/src/JiShe.CollectBus.Application/Plugins/CloseMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/CloseMonitor.cs
@@ -7,10 +7,10 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Plugins
{
- public partial class TcpCloseMonitor(ILogger logger) : PluginBase
+ public partial class TcpCloseMonitor(ILogger logger) : PluginBase, ITcpReceivedPlugin
{
- [GeneratorPlugin(typeof(ITcpReceivedPlugin))]
- public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
+
+ public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
try
{
@@ -19,21 +19,21 @@ namespace JiShe.CollectBus.Plugins
catch (CloseException ex)
{
logger.LogInformation("拦截到CloseException");
- client.Close(ex.Message);
+ await client.CloseAsync(ex.Message);
}
catch (Exception exx)
{
- // ignored
+
}
finally
{
+
}
}
}
- public partial class UdpCloseMonitor(ILogger logger) : PluginBase
+ public partial class UdpCloseMonitor(ILogger logger) : PluginBase, IUdpReceivedPlugin
{
- [GeneratorPlugin(typeof(IUdpReceivedPlugin))]
public Task OnUdpReceived(IUdpSessionBase client, UdpReceivedDataEventArgs e)
{
throw new NotImplementedException();
diff --git a/src/JiShe.CollectBus.Application/Plugins/ServerMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/ServerMonitor.cs
index 39174db..2b3c15a 100644
--- a/src/JiShe.CollectBus.Application/Plugins/ServerMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/ServerMonitor.cs
@@ -5,9 +5,8 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Plugins
{
- public partial class ServerMonitor(ILogger logger) : PluginBase
+ public partial class ServerMonitor(ILogger logger) : PluginBase, IServerStartedPlugin, IServerStopedPlugin
{
- [GeneratorPlugin(typeof(IServerStartedPlugin))]
public Task OnServerStarted(IServiceBase sender, ServiceStateEventArgs e)
{
switch (sender)
@@ -32,7 +31,6 @@ namespace JiShe.CollectBus.Plugins
return e.InvokeNext();
}
- [GeneratorPlugin(typeof(IServerStopedPlugin))]
public Task OnServerStoped(IServiceBase sender,ServiceStateEventArgs e)
{
logger.LogInformation("服务已停止");
diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
index 8ff138a..b3b43d3 100644
--- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
@@ -1,5 +1,7 @@
using System;
+using System.Runtime.CompilerServices;
using System.Threading.Tasks;
+using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Enums;
@@ -20,7 +22,7 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Plugins
{
- public partial class TcpMonitor : PluginBase, ITransientDependency
+ public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
private readonly ICapPublisher _capBus;
private readonly ILogger _logger;
@@ -34,9 +36,9 @@ namespace JiShe.CollectBus.Plugins
///
///
///
- public TcpMonitor(ICapPublisher capBus,
- ILogger logger,
- IRepository deviceRepository,
+ public TcpMonitor(ICapPublisher capBus,
+ ILogger logger,
+ IRepository deviceRepository,
IDistributedCache ammeterInfoCache)
{
_capBus = capBus;
@@ -45,8 +47,7 @@ namespace JiShe.CollectBus.Plugins
_ammeterInfoCache = ammeterInfoCache;
}
- [GeneratorPlugin(typeof(ITcpReceivedPlugin))]
- public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
+ public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
var hexStringList = messageHexString.StringToPairs();
@@ -55,15 +56,21 @@ namespace JiShe.CollectBus.Plugins
var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1))
{
+ var tcpSessionClient = (ITcpSessionClient)client;
+
+
if ((AFN)aFn == AFN.链路接口检测)
{
switch (fn)
{
case 1:
- await OnTcpLoginReceived(client, messageHexString, aTuple.Item1);
+ await OnTcpLoginReceived(tcpSessionClient, messageHexString, aTuple.Item1);
break;
case 3:
- await OnTcpHeartbeatReceived(client, messageHexString, aTuple.Item1);
+ //心跳帧有两种情况:
+ //1. 集中器先有登录帧,再有心跳帧
+ //2. 集中器没有登录帧,只有心跳帧
+ await OnTcpHeartbeatReceived(tcpSessionClient, messageHexString, aTuple.Item1);
break;
default:
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
@@ -72,7 +79,7 @@ namespace JiShe.CollectBus.Plugins
}
else
{
- await OnTcpNormalReceived(client, messageHexString, aTuple.Item1);
+ await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1);
}
}
else
@@ -83,24 +90,31 @@ namespace JiShe.CollectBus.Plugins
await e.InvokeNext();
}
- [GeneratorPlugin(typeof(ITcpConnectingPlugin))]
- public async Task OnTcpConnecting(ITcpSessionClient client, ConnectingEventArgs e)
+ //[GeneratorPlugin(typeof(ITcpConnectingPlugin))]
+ public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e)
{
- _logger.LogInformation($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}正在连接中...");
+ var tcpSessionClient = (ITcpSessionClient)client;
+
+ _logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}正在连接中...");
await e.InvokeNext();
}
- [GeneratorPlugin(typeof(ITcpConnectedPlugin))]
- public async Task OnTcpConnected(ITcpSessionClient client, ConnectedEventArgs e)
+ //[GeneratorPlugin(typeof(ITcpConnectedPlugin))]
+ public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e)
{
- _logger.LogInformation($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}已连接");
+ var tcpSessionClient = (ITcpSessionClient)client;
+
+
+ _logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已连接");
await e.InvokeNext();
}
- [GeneratorPlugin(typeof(ITcpClosedPlugin))]
- public async Task OnTcpClosed(ITcpSessionClient client, ClosedEventArgs e)
+ //[GeneratorPlugin(typeof(ITcpClosedPlugin))]//ITcpSessionClient
+ public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e)
{
- var entity = await _deviceRepository.FindAsync(a=>a.ClientId == client.Id);
+
+ var tcpSessionClient = (ITcpSessionClient)client;
+ var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id);
if (entity != null)
{
entity.UpdateByOnClosed();
@@ -108,7 +122,7 @@ namespace JiShe.CollectBus.Plugins
}
else
{
- _logger.LogWarning($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
+ _logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
}
await e.InvokeNext();
@@ -123,9 +137,24 @@ namespace JiShe.CollectBus.Plugins
///
private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
+ string oldClientId = $"{client.Id}";
+
+ await client.ResetIdAsync(deviceNo);
+
+ var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
+ if (entity == null)
+ {
+ await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
+ }
+ else
+ {
+ entity.UpdateByLoginAndHeartbeat(oldClientId);
+ await _deviceRepository.UpdateAsync(entity);
+ }
+
var messageReceivedLoginEvent = new MessageReceivedLogin
{
- ClientId = client.Id,
+ ClientId = deviceNo,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
@@ -133,23 +162,36 @@ namespace JiShe.CollectBus.Plugins
MessageId = NewId.NextGuid().ToString()
};
await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent);
- var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
- if (entity == null)
- {
- await _deviceRepository.InsertAsync(new Device(deviceNo, client.Id,DateTime.Now, DateTime.Now, DeviceStatus.Online));
- }
- else
- {
- entity.UpdateByLoginAndHeartbeat(client.Id);
- await _deviceRepository.UpdateAsync(entity);
- }
}
private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
+ string clientId = deviceNo;
+ string oldClientId = $"{client.Id}";
+
+ var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
+ if (entity == null) //没有登录帧的设备,只有心跳帧
+ {
+ await client.ResetIdAsync(clientId);
+ await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
+ }
+ else
+ {
+ if (clientId != oldClientId)
+ {
+ entity.UpdateByLoginAndHeartbeat(oldClientId);
+ }
+ else
+ {
+ entity.UpdateByLoginAndHeartbeat();
+ }
+
+ await _deviceRepository.UpdateAsync(entity);
+ }
+
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
- ClientId = client.Id,
+ ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
@@ -157,16 +199,6 @@ namespace JiShe.CollectBus.Plugins
MessageId = NewId.NextGuid().ToString()
};
await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent);
- var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
- if (entity == null)
- {
- await _deviceRepository.InsertAsync(new Device(deviceNo, client.Id, DateTime.Now, DateTime.Now, DeviceStatus.Online));
- }
- else
- {
- entity.UpdateByLoginAndHeartbeat(client.Id);
- await _deviceRepository.UpdateAsync(entity);
- }
}
private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
diff --git a/src/JiShe.CollectBus.Application/Plugins/UdpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/UdpMonitor.cs
index f975a03..b6a576c 100644
--- a/src/JiShe.CollectBus.Application/Plugins/UdpMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/UdpMonitor.cs
@@ -5,9 +5,8 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Plugins
{
- public partial class UdpMonitor : PluginBase
+ public partial class UdpMonitor : PluginBase, IUdpReceivedPlugin
{
- [GeneratorPlugin(typeof(IUdpReceivedPlugin))]
public async Task OnUdpReceived(IUdpSessionBase client, UdpReceivedDataEventArgs e)
{
var udpSession = client as UdpSession;
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 14e0a6a..b1ea2d6 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -200,7 +200,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var ammeter in item)
{
//处理ItemCode
- if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
+ if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
var itemArr = ammeter.DataTypes.Split(',').ToList();
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 61cc2e5..70cecfa 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -70,44 +70,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
Baudrate = 2400,
FocusAddress = "402440506",
- Name = "三相电表",
- FocusID = 1,
+ Name = "张家祠工务(三相电表)",
+ FocusID = 95780,
DatabaseBusiID = 1,
- MeteringCode = 2,
+ MeteringCode = 1,
AmmerterAddress = "402410040506",
- ID = 9980,
+ ID = 127035,
TypeName = 3,
+ DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
+ TimeDensity = 15,
});
ammeterInfos.Add(new AmmeterInfo()
{
Baudrate = 2400,
FocusAddress = "542400504",
- Name = "单相电表",
- FocusID = 1,
+ Name = "五号配(长芦二所四排)(单相电表)",
+ FocusID = 69280,
DatabaseBusiID = 1,
MeteringCode = 2,
AmmerterAddress = "542410000504",
- ID = 9981,
+ ID = 95594,
TypeName = 1,
+ DataTypes = "581,589,592,597,601",
+ TimeDensity = 15,
});
return ammeterInfos;
- //string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
- // FROM TB_GatherInfo(NOLOCK) AS A
- // INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
- // INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
- // INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
- // WHERE 1=1 and C.Special = 0 ";
- ////TODO 记得移除特殊表过滤
+ string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
+ FROM TB_GatherInfo(NOLOCK) AS A
+ INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
+ INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
+ INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
+ WHERE 1=1 and C.Special = 0 ";
+ //TODO 记得移除特殊表过滤
- //if (!string.IsNullOrWhiteSpace(gatherCode))
- //{
- // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
- //}
- //return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
- // .Ado
- // .QueryAsync(sql);
+ if (!string.IsNullOrWhiteSpace(gatherCode))
+ {
+ sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ }
+ return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
+ .Ado
+ .QueryAsync(sql);
}
///
diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
index 7dbe586..b2f7400 100644
--- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
+using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
@@ -62,6 +63,8 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
public async Task IssuedEvent(IssuedEventMessage issuedEventMessage)
{
+ _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 回复下发内容IssuedEvent:{issuedEventMessage.MessageId}");
+
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@@ -82,11 +85,14 @@ namespace JiShe.CollectBus.Subscribers
default:
throw new ArgumentOutOfRangeException();
}
- var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
- if (device != null)
- {
- await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
- }
+
+ //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
+ //if (device != null)
+ //{
+ // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
+ //}
+
+ await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
}
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs b/src/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs
index c31dbc9..a18c9d8 100644
--- a/src/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs
+++ b/src/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs
@@ -64,6 +64,12 @@ namespace JiShe.CollectBus.IotSystems.Devices
Status = DeviceStatus.Online;
}
+ public void UpdateByLoginAndHeartbeat()
+ {
+ LastOnlineTime = DateTime.Now;
+ Status = DeviceStatus.Online;
+ }
+
public void UpdateByOnClosed()
{
LastOfflineTime = DateTime.Now;
diff --git a/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj
index 0b178ee..b70752b 100644
--- a/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj
+++ b/src/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj
@@ -25,8 +25,8 @@
-
-
+
+
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index 50c459b..69bdcc1 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -5,11 +5,11 @@
"Serilog.Sinks.File"
],
"MinimumLevel": {
- "Default": "Warning",
+ "Default": "Information",
"Override": {
- "Microsoft": "Information",
+ "Microsoft": "Warning",
"Volo.Abp": "Warning",
- "Hangfire": "Information",
+ "Hangfire": "Warning",
"DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning",
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index 4638983..978a056 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -602,7 +602,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
///
///
///
- public virtual TB3761FN AnalyzeReadingDataAsync(MessageReceived messageReceived,
+ public virtual TB3761 AnalyzeReadingDataAsync(MessageReceived messageReceived,
Action? sendAction = null)
{
var hexStringList = messageReceived.MessageHexString.StringToPairs();
@@ -664,7 +664,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
}
}
- return tb3761Fn;
+ return tb3761;
}
///
@@ -673,7 +673,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
///
///
///
- public virtual TB3761FN AnalyzeReadingTdcDataAsync(MessageReceived messageReceived,
+ public virtual TB3761 AnalyzeReadingTdcDataAsync(MessageReceived messageReceived,
Action? sendAction = null)
{
@@ -717,7 +717,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
}
}
- return tb3761Fn;
+ return tb3761;
//var freezeDensity = (FreezeDensity)Convert.ToInt32(hexDatas.Skip(5).Take(1));
//var addMinute = 0;
//switch (freezeDensity)
From 5a294b437c97721f1704315c1c07bdbe29f895da Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Tue, 1 Apr 2025 22:50:34 +0800
Subject: [PATCH 04/18] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Plugins/TcpMonitor.cs | 27 ++++++++++----
.../BasicScheduledMeterReadingService.cs | 35 +++++++++++++------
...nergySystemScheduledMeterReadingService.cs | 2 +-
.../Subscribers/SubscriberAppService.cs | 7 ++--
.../CollectBusHostModule.Configure.cs | 24 ++++++++-----
.../CollectBusHostModule.cs | 4 +--
.../Abstracts/BaseProtocolPlugin.cs | 12 ++++---
7 files changed, 73 insertions(+), 38 deletions(-)
diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
index b3b43d3..4801404 100644
--- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
+++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
@@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
- private readonly ICapPublisher _capBus;
+ private readonly IPublishEndpoint _producerBus;
private readonly ILogger _logger;
private readonly IRepository _deviceRepository;
private readonly IDistributedCache _ammeterInfoCache;
@@ -32,16 +32,16 @@ namespace JiShe.CollectBus.Plugins
///
///
///
- ///
+ ///
///
///
///
- public TcpMonitor(ICapPublisher capBus,
+ public TcpMonitor(IPublishEndpoint producerBus,
ILogger logger,
IRepository deviceRepository,
IDistributedCache ammeterInfoCache)
{
- _capBus = capBus;
+ _producerBus = producerBus;
_logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache;
@@ -161,7 +161,9 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
- await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent);
+ //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent);
+
+ await _producerBus.Publish( messageReceivedLoginEvent);
}
private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
@@ -198,12 +200,13 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
- await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent);
+ //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent);
+ await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
- await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
+ await _producerBus.Publish(new MessageReceived
{
ClientId = client.Id,
ClientIp = client.IP,
@@ -212,6 +215,16 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
});
+
+ //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
+ //{
+ // ClientId = client.Id,
+ // ClientIp = client.IP,
+ // ClientPort = client.Port,
+ // MessageHexString = messageHexString,
+ // DeviceNo = deviceNo,
+ // MessageId = NewId.NextGuid().ToString()
+ //});
}
}
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index b1ea2d6..f65e51f 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -39,15 +39,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger _logger;
- private readonly ICapPublisher _capBus;
+ private readonly IPublishEndpoint _producerBus;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
public BasicScheduledMeterReadingService(
ILogger logger,
- ICapPublisher capBus,
+ IPublishEndpoint producerBus,
IMeterReadingRecordRepository meterReadingRecordsRepository)
{
- _capBus = capBus;
+ _producerBus = producerBus;
_logger = logger;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
}
@@ -298,8 +298,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
-
+ //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+ _= _producerBus.Publish(tempMsg);
+
+
meterTaskInfosList.Add(ammerterItem.Value);
}
}
@@ -363,7 +365,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- _= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
+ //_= _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
+
+ _ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
@@ -433,9 +437,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(),
};
- _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
- //await _massTransitBus.Publish(tempMsg);
+ _ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
@@ -823,7 +827,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+ //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+
+ _ = _producerBus.Publish(tempMsg);
+
meterTaskInfosList.Add(ammerterItem.Value);
}
@@ -889,7 +896,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+ //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+
+ _ = _producerBus.Publish(tempMsg);
+
meterTaskInfosList.Add(ammerterItem.Value);
}
@@ -954,7 +964,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+ //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
+
+ _ = _producerBus.Publish(tempMsg);
+
meterTaskInfosList.Add(ammerterItem.Value);
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 70cecfa..78276e9 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -29,7 +29,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
public EnergySystemScheduledMeterReadingService(ILogger logger,
- ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, capBus, meterReadingRecordsRepository)
+ IPublishEndpoint producerBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, producerBus, meterReadingRecordsRepository)
{
}
diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
index b2f7400..242b216 100644
--- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
@@ -62,19 +62,18 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
public async Task IssuedEvent(IssuedEventMessage issuedEventMessage)
- {
- _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 回复下发内容IssuedEvent:{issuedEventMessage.MessageId}");
-
+ {
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
- _logger.LogInformation($"IssuedEvent:{issuedEventMessage.MessageId}");
+ _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
break;
case IssuedEventType.Login:
+ _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index 374eaa5..f0a02c9 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -21,6 +21,7 @@ using JiShe.CollectBus.Consumers;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MessageIssueds;
+using Confluent.Kafka;
namespace JiShe.CollectBus.Host
@@ -279,9 +280,13 @@ namespace JiShe.CollectBus.Host
/// The configuration.
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{
+
+ //context.Services.AddSingleton();
+
context.Services.AddMassTransit(x =>
{
x.UsingInMemory();
+
x.AddConfigureEndpointsCallback((c, name, cfg) =>
{
@@ -308,37 +313,38 @@ namespace JiShe.CollectBus.Host
rider.UsingKafka((c, cfg) =>
{
+
cfg.Host(configuration.GetConnectionString("Kafka"));
cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer(c);
- configurator.ConfigureConsumeTopology = false;
+ configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedLoginEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer(c);
- configurator.ConfigureConsumeTopology = false;
+ configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer(c);
- configurator.ConfigureConsumeTopology = false;
+ configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint(ProtocolConst.SubscriberIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
{
configurator.ConfigureConsumer(c);
- configurator.ConfigureConsumeTopology = false;
+ configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
- //cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
- //{
- // configurator.ConfigureConsumer(c);
- // configurator.ConfigureConsumeTopology = false;
- //});
+ cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ {
+ configurator.ConfigureConsumer(c);
+ configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
+ });
});
});
});
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
index 6f24b1e..263be23 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -43,8 +43,8 @@ namespace JiShe.CollectBus.Host
ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration);
ConfigureHangfire(context);
- ConfigureCap(context, configuration);
- //ConfigureMassTransit(context, configuration);
+ //ConfigureCap(context, configuration);
+ ConfigureMassTransit(context, configuration);
ConfigureAuditLog(context);
ConfigureCustom(context, configuration);
}
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index 978a056..2fbe138 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -11,12 +11,13 @@ using JiShe.CollectBus.Protocol.Contracts.AnalysisData;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
+using MassTransit;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
public abstract class BaseProtocolPlugin : IProtocolPlugin
{
- private readonly ICapPublisher _capBus;
+ private readonly IPublishEndpoint _producerBus;
private readonly ILogger _logger;
private readonly IRepository _protocolInfoRepository;
@@ -36,7 +37,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
_logger = serviceProvider.GetRequiredService>();
_protocolInfoRepository = serviceProvider.GetRequiredService>();
- _capBus = serviceProvider.GetRequiredService();
+ _producerBus = serviceProvider.GetRequiredService();
}
public abstract ProtocolInfo Info { get; }
@@ -86,7 +87,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
- await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
+ //await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
+ await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
}
///
@@ -124,7 +126,9 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
- await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
+ //await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
+
+ await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
}
}
From 838ef197e27663f5ad6bfc655257e0e4fcbc2979 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 2 Apr 2025 09:42:04 +0800
Subject: [PATCH 05/18] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E8=87=AA=E5=8A=A8?=
=?UTF-8?q?=E5=88=9B=E5=BB=BA=E4=B8=BB=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../CollectBusHostModule.Configure.cs | 55 ++++++++++++-------
.../Extensions/ProtocolConstExtensions.cs | 29 ++++++++++
2 files changed, 64 insertions(+), 20 deletions(-)
create mode 100644 src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index f0a02c9..3516bbb 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -22,6 +22,7 @@ using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Confluent.Kafka;
+using MassTransit.SqlTransport.Topology;
namespace JiShe.CollectBus.Host
@@ -273,6 +274,11 @@ namespace JiShe.CollectBus.Host
}
+ ///
+ /// Configures the mass transit.
+ ///
+ /// The context.
+ /// The configuration.
///
/// Configures the mass transit.
///
@@ -280,20 +286,29 @@ namespace JiShe.CollectBus.Host
/// The configuration.
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{
+ var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
+ var producerConfig = new ProducerConfig();
- //context.Services.AddSingleton();
-
- context.Services.AddMassTransit(x =>
+ context.Services
+ .ConfigureKafkaTestOptions(options =>
{
- x.UsingInMemory();
-
- x.AddConfigureEndpointsCallback((c, name, cfg) =>
- {
- cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
- cfg.UseMessageRetry(r => r.Immediate(5));
- cfg.UseInMemoryOutbox(c);
- });
+#if DEBUG
+ options.CleanTopicsOnStart = true;// 测试时,每次启动都删除topic,生产环境不需要
+#endif
+ options.CreateTopicsIfNotExists = true;
+ options.TopicNames = ProtocolConstExtensions.GetAllTopicNames();
+ })
+ .AddMassTransit(x =>
+ {
+ x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
+
+ //x.AddConfigureEndpointsCallback((c, name, cfg) =>
+ //{
+ // cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
+ // cfg.UseMessageRetry(r => r.Immediate(5));
+ // cfg.UseInMemoryOutbox(c);
+ //});
x.AddRider(rider =>
{
@@ -310,37 +325,37 @@ namespace JiShe.CollectBus.Host
});
rider.AddConsumer();
-
+
rider.UsingKafka((c, cfg) =>
{
-
- cfg.Host(configuration.GetConnectionString("Kafka"));
+ List hosts = new List() { "121.42.242.91:29092", "121.42.242.91:39092", "121.42.242.91:49092" };
+ cfg.Host(hosts);
- cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator =>
{
- configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
+ configurator.ConfigureConsumer(c);
});
- cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedLoginEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedLoginEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
- cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
- cfg.TopicEndpoint(ProtocolConst.SubscriberIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberIssuedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
- cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
new file mode 100644
index 0000000..9455144
--- /dev/null
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Protocol.Contracts
+{
+ public class ProtocolConstExtensions
+ {
+ ///
+ /// 自动获取 ProtocolConst 类中所有 Kafka 主题名称
+ /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
+ ///
+ public static List GetAllTopicNames()
+ {
+ return typeof(ProtocolConst)
+ .GetFields(BindingFlags.Public | BindingFlags.Static)
+ .Where(f =>
+ f.IsLiteral &&
+ !f.IsInitOnly &&
+ f.FieldType == typeof(string) &&
+ f.Name.EndsWith("EventName")) // 通过命名规则过滤主题字段
+ .Select(f => (string)f.GetRawConstantValue()!)
+ .ToList();
+ }
+ }
+}
From c57bd15b92819204cb8807e08c4b98c039222fc5 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 2 Apr 2025 14:06:40 +0800
Subject: [PATCH 06/18] =?UTF-8?q?=E5=B0=81=E8=A3=85IoTDBProvider?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
JiShe.CollectBus.sln | 7 +
.../Attribute/ATTRIBUTEColumnAttribute.cs | 16 ++
.../Attribute/FIELDColumnAttribute.cs | 16 ++
.../Attribute/TAGColumnAttribute.cs | 16 ++
.../CollectBusIoTDBModule.cs | 19 ++
.../DeviceMetadata.cs | 24 ++
.../DevicePathBuilder.cs | 37 +++
.../IIoTDBProvider.cs | 38 +++
.../IoTDBProvider.cs | 262 ++++++++++++++++++
.../IoTEntity.cs | 19 ++
.../JiShe.CollectBus.IoTDBProvider.csproj | 12 +
.../Options/IoTDBOptions.cs | 32 +++
.../Options/PagedResult.cs | 25 ++
.../Options/QueryCondition.cs | 27 ++
.../Options/QueryOptions.cs | 27 ++
15 files changed, 577 insertions(+)
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/QueryCondition.cs
create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs
diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index 674eca4..0a7a6e7 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -31,6 +31,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -89,6 +91,10 @@ Global
{C06C4082-638F-2996-5FED-7784475766C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -107,6 +113,7 @@ Global
{8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
+ {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs
new file mode 100644
index 0000000..42820bc
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ ///
+ /// Column分类标记特性(TAG字段)
+ ///
+ [AttributeUsage(AttributeTargets.Property)]
+ public class TAGColumnAttribute : Attribute
+ {
+ }
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs
new file mode 100644
index 0000000..2c48a65
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ ///
+ /// Column分类标记特性(FIELD字段)
+ ///
+ [AttributeUsage(AttributeTargets.Property)]
+ public class FIELDColumnAttribute : Attribute
+ {
+ }
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs
new file mode 100644
index 0000000..28f6a46
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ ///
+ /// Column分类标记特性(ATTRIBUTE字段)
+ ///
+ [AttributeUsage(AttributeTargets.Property)]
+ public class ATTRIBUTEColumnAttribute : Attribute
+ {
+ }
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs
new file mode 100644
index 0000000..5a8902c
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs
@@ -0,0 +1,19 @@
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Volo.Abp.Modularity;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ public class CollectBusIoTDBModule : AbpModule
+ {
+ public override void ConfigureServices(ServiceConfigurationContext context)
+ {
+ context.Services.Configure(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions)));
+ context.Services.AddSingleton();
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs b/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs
new file mode 100644
index 0000000..dea65fb
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs
@@ -0,0 +1,24 @@
+using Apache.IoTDB;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ ///
+ /// 设备元数据
+ ///
+ public class DeviceMetadata
+ {
+ public List Measurements { get; } = new();
+ public List Tags { get; } = new();
+
+ public List GetDataTypes()
+ {
+ // 根据实际类型映射TSDataType
+ return Measurements.Select(_ => TSDataType.TEXT).ToList();
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs b/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs
new file mode 100644
index 0000000..ef6d7e4
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs
@@ -0,0 +1,37 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ ///
+ /// 设备路径构建器
+ ///
+ public static class DevicePathBuilder
+ {
+ ///
+ /// 构建存储组路径
+ ///
+ ///
+ ///
+ public static string BuildStorageGroupPath() where T : IoTEntity
+ {
+ var type = typeof(T);
+ return $"root.{type.GetProperty("SystemName")?.Name}.{type.GetProperty("ProjectCode")?.Name}";
+ }
+
+ ///
+ /// 构建设备路径
+ ///
+ ///
+ ///
+ ///
+ public static string BuildDevicePath(T entity) where T : IoTEntity
+ {
+ return $"root.{entity.SystemName}.{entity.ProjectCode}.{entity.DeviceId}";
+ }
+ }
+
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs
new file mode 100644
index 0000000..026a95a
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ ///
+ /// IoTDB数据源
+ ///
+ public interface IIoTDBProvider
+ {
+ ///
+ /// 插入数据
+ ///
+ ///
+ ///
+ ///
+ Task InsertAsync(T entity) where T : IoTEntity;
+
+ ///
+ /// 批量插入数据
+ ///
+ ///
+ ///
+ ///
+ Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity;
+
+ ///
+ /// 查询数据
+ ///
+ ///
+ ///
+ ///
+ Task> QueryAsync(QueryOptions options) where T : IoTEntity, new();
+ }
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs
new file mode 100644
index 0000000..f2a5546
--- /dev/null
+++ b/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs
@@ -0,0 +1,262 @@
+using Apache.IoTDB;
+using Apache.IoTDB.DataStructure;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.IoTDBProvider
+{
+ ///
+ /// IoTDB数据源
+ ///
+ public class IoTDBProvider : IIoTDBProvider, IDisposable
+ {
+ private readonly IoTDBOptions _options;
+ private readonly SessionPool _sessionPool;
+ private static readonly ConcurrentDictionary _metadataCache = new();
+
+ public IoTDBProvider(IOptions options)
+ {
+ _options = options.Value;
+ _sessionPool = new SessionPool(
+ _options.ClusterList,
+ _options.UserName,
+ _options.Password,
+ _options.PoolSize);
+ _sessionPool.Open(false).Wait();
+ }
+
+ ///
+ /// 获取设备元数据
+ ///
+ ///
+ ///
+ private DeviceMetadata GetMetadata() where T : IoTEntity
+ {
+ return _metadataCache.GetOrAdd(typeof(T), type =>
+ {
+ var metadata = new DeviceMetadata();
+ foreach (var prop in type.GetProperties())
+ {
+ var attr = prop.GetCustomAttribute();
+ if (attr != null)
+ {
+ metadata.Tags.Add(prop.Name);
+ }
+ else if (prop.Name != nameof(IoTEntity.Timestamp))
+ {
+ metadata.Measurements.Add(prop.Name);
+ }
+ }
+ return metadata;
+ });
+ }
+
+ ///
+ /// 插入数据
+ ///
+ ///
+ ///
+ ///
+ public async Task InsertAsync(T entity) where T : IoTEntity
+ {
+ var metadata = GetMetadata();
+ var storageGroup = DevicePathBuilder.BuildStorageGroupPath();
+ await EnsureStorageGroupCreated(storageGroup);
+
+ var tablet = BuildTablet(new[] { entity }, metadata);
+ await _sessionPool.InsertAlignedTabletAsync(tablet);
+ }
+
+ ///
+ /// 批量插入数据
+ ///
+ ///
+ ///
+ ///
+ public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity
+ {
+ var metadata = GetMetadata();
+ var storageGroup = DevicePathBuilder.BuildStorageGroupPath();
+ await EnsureStorageGroupCreated(storageGroup);
+
+ var batchSize = 1000;
+ var batches = entities.Chunk(batchSize);
+
+ foreach (var batch in batches)
+ {
+ var tablet = BuildTablet(batch, metadata);
+ await _sessionPool.InsertAlignedTabletAsync(tablet);
+ }
+ }
+
+ ///
+ /// 构建表模型
+ ///
+ ///
+ ///
+ ///
+ ///
+ private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity
+ {
+ var devicePath = DevicePathBuilder.BuildDevicePath(entities.First());
+ var timestamps = new List();
+ var values = new List>();
+
+ foreach (var entity in entities)
+ {
+ timestamps.Add(entity.Timestamp);
+ var rowValues = new List
public static class DevicePathBuilder
{
- ///
- /// 构建存储组路径
- ///
- ///
- ///
- public static string BuildStorageGroupPath() where T : IoTEntity
- {
- var type = typeof(T);
- return $"root.{type.GetProperty("SystemName")?.Name}.{type.GetProperty("ProjectCode")?.Name}";
- }
-
///
/// 构建设备路径
///
///
///
///
- public static string BuildDevicePath(T entity) where T : IoTEntity
+ public static string GetDeviceId(T entity) where T : IoTEntity
{
return $"root.{entity.SystemName}.{entity.ProjectCode}.{entity.DeviceId}";
}
+
+
+ ///
+ /// 获取表名称
+ ///
+ ///
+ ///
+ ///
+ public static string GetTableName() where T : IoTEntity
+ {
+ var type = typeof(T);
+ return $"{type.Name}";
+ }
}
}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs
similarity index 70%
rename from src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs
rename to src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs
index f2a5546..449884a 100644
--- a/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs
+++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs
@@ -17,17 +17,20 @@ namespace JiShe.CollectBus.IoTDBProvider
public class IoTDBProvider : IIoTDBProvider, IDisposable
{
private readonly IoTDBOptions _options;
- private readonly SessionPool _sessionPool;
+ private readonly TableSessionPool _sessionPool;
private static readonly ConcurrentDictionary _metadataCache = new();
public IoTDBProvider(IOptions options)
{
_options = options.Value;
- _sessionPool = new SessionPool(
- _options.ClusterList,
- _options.UserName,
- _options.Password,
- _options.PoolSize);
+
+ _sessionPool = new TableSessionPool.Builder()
+ .SetNodeUrls(_options.ClusterList)
+ .SetUsername(_options.UserName)
+ .SetPassword(_options.Password)
+ .SetFetchSize(_options.PoolSize)
+ .Build();
+
_sessionPool.Open(false).Wait();
}
@@ -43,14 +46,27 @@ namespace JiShe.CollectBus.IoTDBProvider
var metadata = new DeviceMetadata();
foreach (var prop in type.GetProperties())
{
- var attr = prop.GetCustomAttribute();
- if (attr != null)
+ //标签列
+ var attrTAG = prop.GetCustomAttribute();
+ if (attrTAG != null)
{
- metadata.Tags.Add(prop.Name);
+ metadata.ColumnCategories.Add(ColumnCategory.TAG);
}
- else if (prop.Name != nameof(IoTEntity.Timestamp))
+
+ //属性列
+ var attrATTRIBUTE = prop.GetCustomAttribute();
+ if (attrATTRIBUTE != null)
{
+ metadata.ColumnCategories.Add(ColumnCategory.ATTRIBUTE);
+ }
+
+ //数据列
+ var attrFIELD = prop.GetCustomAttribute();
+ if (attrFIELD != null)
+ {
+ metadata.ColumnCategories.Add(ColumnCategory.FIELD);
metadata.Measurements.Add(prop.Name);
+ metadata.DataTypes.Add(GetDataTypeFromStr(prop.PropertyType.Name));
}
}
return metadata;
@@ -66,11 +82,9 @@ namespace JiShe.CollectBus.IoTDBProvider
public async Task InsertAsync(T entity) where T : IoTEntity
{
var metadata = GetMetadata();
- var storageGroup = DevicePathBuilder.BuildStorageGroupPath();
- await EnsureStorageGroupCreated(storageGroup);
var tablet = BuildTablet(new[] { entity }, metadata);
- await _sessionPool.InsertAlignedTabletAsync(tablet);
+ await _sessionPool.InsertAsync(tablet);
}
///
@@ -81,9 +95,7 @@ namespace JiShe.CollectBus.IoTDBProvider
///
public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity
{
- var metadata = GetMetadata();
- var storageGroup = DevicePathBuilder.BuildStorageGroupPath();
- await EnsureStorageGroupCreated(storageGroup);
+ var metadata = GetMetadata();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
@@ -91,7 +103,7 @@ namespace JiShe.CollectBus.IoTDBProvider
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
- await _sessionPool.InsertAlignedTabletAsync(tablet);
+ await _sessionPool.InsertAsync(tablet);
}
}
@@ -104,34 +116,33 @@ namespace JiShe.CollectBus.IoTDBProvider
///
private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity
{
- var devicePath = DevicePathBuilder.BuildDevicePath(entities.First());
+ var deviceId = DevicePathBuilder.GetDeviceId(entities.First());
var timestamps = new List();
var values = new List>();
foreach (var entity in entities)
{
- timestamps.Add(entity.Timestamp);
+ timestamps.Add(entity.Timestamps);
var rowValues = new List
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index 3516bbb..40f498e 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -290,15 +290,15 @@ namespace JiShe.CollectBus.Host
var producerConfig = new ProducerConfig();
context.Services
- .ConfigureKafkaTestOptions(options =>
- {
+// .ConfigureKafkaTestOptions(options =>
+// {
-#if DEBUG
- options.CleanTopicsOnStart = true;// 测试时,每次启动都删除topic,生产环境不需要
-#endif
- options.CreateTopicsIfNotExists = true;
- options.TopicNames = ProtocolConstExtensions.GetAllTopicNames();
- })
+//#if DEBUG
+// options.CleanTopicsOnStart = true;// 测试时,每次启动都删除topic,生产环境不需要
+//#endif
+// options.CreateTopicsIfNotExists = true;
+// options.TopicNames = ProtocolConstExtensions.GetAllTopicNames();
+// })
.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index 69bdcc1..b506272 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -1,4 +1,4 @@
- {
+{
"Serilog": {
"Using": [
"Serilog.Sinks.Console",
@@ -20,7 +20,7 @@
{
"Name": "Console"
},
- {
+ {
"Name": "File",
"Args": {
"path": "logs/logs-.txt",
@@ -33,17 +33,17 @@
"SelfUrl": "http://localhost:44315",
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
- "ConnectionStrings": {
- "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
- "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
- "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
- "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
- },
- "Redis": {
- "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
- "DefaultDB": "14",
- "HangfireDB": "15"
- },
+ "ConnectionStrings": {
+ "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
+ "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
+ "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
+ "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
+ },
+ "Redis": {
+ "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
+ "DefaultDB": "14",
+ "HangfireDB": "15"
+ },
"Jwt": {
"Audience": "JiShe.CollectBus",
"SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=",
@@ -80,5 +80,11 @@
"Password": "123456",
"Port": 5672
}
+ },
+ "IoTDBOptions": {
+ "UserName": "root",
+ "Password": "root",
+ "ClusterList": [ "192.168.56.102:6667"],
+ "PoolSize": 10
}
}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs
index 2b090de..45964fa 100644
--- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs
+++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs
@@ -30,8 +30,8 @@
public string DeviceId { get; set; }
///
- /// 时间戳
+ /// 当前时间戳,单位秒
///
- public long Timestamps { get; set; }
+ public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
}
}
From 5772ce906d9dc0afe838892ea4cd8388fdaeaa97 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Thu, 3 Apr 2025 15:38:31 +0800
Subject: [PATCH 09/18] =?UTF-8?q?=E5=AE=8C=E5=96=84IOTDB=20Provider?=
=?UTF-8?q?=E5=B0=81=E8=A3=85?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Samples/SampleAppService.cs | 5 +-
.../AttributeInfo/NumericalOrderAttribute.cs | 28 +
.../Extensions/EnumExtensions.cs | 68 ++
.../Helpers/CommonHelper.cs | 764 ++++++++++++++++++
.../Helpers/JsonHelper.cs | 28 +
.../Helpers/SelectResult.cs | 35 +
.../Interface/IIoTDBProvider.cs | 15 +-
.../JiShe.CollectBus.IoTDBProvider.csproj | 3 +
.../Options/QueryOptions.cs | 5 +
.../Provider/DeviceMetadata.cs | 8 +-
.../Provider/IoTDBProvider.cs | 396 ++++++---
11 files changed, 1252 insertions(+), 103 deletions(-)
create mode 100644 src/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs
create mode 100644 src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs
create mode 100644 src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
create mode 100644 src/JiShe.CollectBus.Common/Helpers/SelectResult.cs
diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index f76ce54..9c2a86f 100644
--- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -25,15 +25,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
ElectricityMeter meter = new ElectricityMeter()
{
SystemName = "Energy",
- DeviceId = "402440506"
- ,
+ DeviceId = "402440506",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectCode = "10059",
Voltage = 10
};
- await _iotDBProvider.InsertAsync(meter);
+ await _iotDBProvider.InsertAsync(meter,2);
}
public Task GetAsync()
diff --git a/src/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs b/src/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs
new file mode 100644
index 0000000..a062f16
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs
@@ -0,0 +1,28 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.AttributeInfo
+{
+ ///
+ /// 排序序号
+ ///
+ public class NumericalOrderAttribute : Attribute
+ {
+ ///
+ /// 序号
+ ///
+ public int Index { get; set; }
+
+ ///
+ /// 排序序号
+ ///
+ ///
+ public NumericalOrderAttribute(int index)
+ {
+ Index = index;
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs
new file mode 100644
index 0000000..222984e
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs
@@ -0,0 +1,68 @@
+using System;
+
+namespace JiShe.CollectBus.Common.Extensions
+{
+ public static class EnumExtensions
+ {
+ ///
+ /// 将枚举转换为字典
+ ///
+ ///
+ ///
+ public static Dictionary ToDictionary() where TEnum : Enum
+ {
+ return Enum.GetValues(typeof(TEnum))
+ .Cast()
+ .ToDictionary(
+ e => e.ToString(),
+ e => Convert.ToInt32(e)
+ );
+ }
+
+ ///
+ /// 将枚举转换为字典
+ ///
+ ///
+ ///
+ public static Dictionary ToEnumDictionary() where TEnum : Enum
+ {
+ return Enum.GetValues(typeof(TEnum))
+ .Cast()
+ .ToDictionary(
+ e => e.ToString(),
+ e => e
+ );
+ }
+
+ ///
+ /// 将枚举转换为字典
+ ///
+ ///
+ ///
+ public static Dictionary ToValueNameDictionary() where TEnum : Enum
+ {
+ return Enum.GetValues(typeof(TEnum))
+ .Cast()
+ .ToDictionary(
+ e => Convert.ToInt32(e),
+ e => e.ToString()
+ );
+ }
+
+ ///
+ /// 将枚举转换为字典
+ ///
+ ///
+ ///
+ public static Dictionary ToEnumNameDictionary() where TEnum : Enum
+ {
+ return Enum.GetValues(typeof(TEnum))
+ .Cast()
+ .ToDictionary(
+ e => e,
+ e => e.ToString()
+ );
+ }
+
+ }
+}
diff --git a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
new file mode 100644
index 0000000..fa38d1d
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
@@ -0,0 +1,764 @@
+using System;
+using System.Collections.Generic;
+using System.ComponentModel.DataAnnotations;
+using System.ComponentModel;
+using System.Linq;
+using System.Reflection;
+using System.Runtime.InteropServices;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading.Tasks;
+using JiShe.CollectBus.Common.AttributeInfo;
+
+namespace JiShe.CollectBus.Common.Helpers
+{
+ public static class CommonHelper
+ {
+ ///
+ /// 获得无符号GUID
+ ///
+ ///
+ public static string GetGUID()
+ {
+ return Guid.NewGuid().ToString("N");
+ }
+
+ ///
+ /// 获取时间戳
+ ///
+ /// 是否返回秒,false返回毫秒
+ ///
+ public static long GetTimeStampTen(bool isSeconds)
+ {
+ if (isSeconds)
+ {
+ return DateTimeOffset.UtcNow.ToUnixTimeSeconds();
+ }
+ else
+ {
+ return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ }
+ }
+
+ ///
+ /// 获取指定长度的随机数
+ ///
+ ///
+ ///
+ public static string GetRandomNumber(int length = 8)
+ {
+ if (length <= 8)
+ {
+ length = 8;
+ }
+
+ if (length > 31)
+ {
+ length = 32;
+ }
+
+ var randomArray = RandomNumberGenerator.GetBytes(length);
+ StringBuilder stringBuilder = new StringBuilder();
+ foreach (var item in randomArray)
+ {
+ stringBuilder.Append(item);
+ }
+
+ return stringBuilder.ToString().Substring(0, length);
+ }
+
+ ///
+ /// C#反射遍历对象属性获取键值对
+ ///
+ /// 对象类型
+ /// 对象
+ public static Dictionary GetClassProperties(T model)
+ {
+ Type t = model.GetType();
+ List propertyList = new List();
+
+ PropertyInfo[] tempPropertyList = t.GetProperties();
+ if (tempPropertyList != null && tempPropertyList.Length > 0)
+ {
+ propertyList.AddRange(tempPropertyList);
+ }
+
+ var parentPropertyInfo = t.BaseType?.GetProperties();
+ if (parentPropertyInfo != null && parentPropertyInfo.Length > 0)
+ {
+ foreach (var item in parentPropertyInfo)
+ {
+ if (!propertyList.Any(d => d.Name == item.Name)) //如果子类已经包含父类的属性或者字段,跳过不处理
+ {
+ propertyList.Add(item);
+ }
+ }
+ }
+
+ Dictionary resultData = new Dictionary();
+
+ foreach (PropertyInfo item in propertyList)
+ {
+ resultData.Add(item.Name, item.GetValue(model, null));
+ }
+
+ return resultData;
+ }
+
+ ///
+ /// C#反射遍历对象属性,将键值对赋值给属性
+ ///
+ public static object SetClassProperties(string typeModel, Dictionary keyValues)
+ {
+ if (keyValues.Count <= 0)
+ {
+ return null;
+ }
+
+ Type tType = Type.GetType(typeModel);
+
+
+ PropertyInfo[] PropertyList = tType.GetProperties();
+
+ object objModel = tType.Assembly.CreateInstance(tType.FullName);
+ foreach (PropertyInfo item in PropertyList)
+ {
+ if (keyValues.ContainsKey(item.Name))
+ {
+ object objectValue = keyValues[item.Name];
+ item.SetValue(objModel, objectValue);
+ }
+ }
+
+ return objModel;
+ }
+
+ ///
+ /// 取得某月的第一天
+ ///
+ /// 要取得月份第一天的时间
+ ///
+ public static DateTime FirstDayOfMonth(this DateTime datetime)
+ {
+ return datetime.AddDays(1 - datetime.Day);
+ }
+
+ ///
+ /// 取得某月的最后一天
+ ///
+ /// 要取得月份最后一天的时间
+ ///
+ public static DateTime LastDayOfMonth(this DateTime datetime)
+ {
+ return datetime.AddDays(1 - datetime.Day).AddMonths(1).AddDays(-1);
+ }
+
+ ///
+ /// 取得某月第一天0点以及最后一天的23:59:59时间范围
+ ///
+ ///
+ ///
+ public static Tuple GetMonthDateRange(this DateTime datetime)
+ {
+ var lastDayOfMonthDate = LastDayOfMonth(datetime);
+ return new Tuple(datetime.FirstDayOfMonth(), new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
+ }
+
+ ///
+ /// 取得某一天0点到当月最后一天的23:59:59时间范围
+ ///
+ ///
+ ///
+ public static Tuple GetCurrentDateToLastDayRange(this DateTime datetime)
+ {
+ var lastDayOfMonthDate = LastDayOfMonth(datetime);
+ return new Tuple(datetime.Date, new DateTime(lastDayOfMonthDate.Year, lastDayOfMonthDate.Month, lastDayOfMonthDate.Day, 23, 59, 59));
+ }
+
+ ///
+ /// 取得某一天0点到23:59:59时间范围
+ ///
+ ///
+ ///
+ public static Tuple GetCurrentDateRange(this DateTime datetime)
+ {
+ return new Tuple(datetime.Date, new DateTime(datetime.Year, datetime.Month, datetime.Day, 23, 59, 59));
+ }
+
+ ///
+ /// 获取指定枚举的所有 Attribute 说明以及value组成的键值对
+ ///
+ /// 对象类
+ /// false获取字段、true获取属性
+ ///
+ public static List GetEnumAttributeList(Type type, bool getPropertie = false)
+ {
+ if (type == null)
+ {
+ return null;
+ }
+
+ List selectResults = new List();
+ List memberInfos = new List();
+
+ if (getPropertie == false)
+ {
+ FieldInfo[] fieldArray = type.GetFields();
+ if (null == fieldArray || fieldArray.Length <= 0)
+ {
+ return null;
+ }
+
+ memberInfos.AddRange(fieldArray);
+ //获取父类的字段
+ var parentFieldInfo = type.BaseType?.GetFields();
+ if (parentFieldInfo != null && parentFieldInfo.Length > 0)
+ {
+ foreach (var item in parentFieldInfo)
+ {
+ if (!memberInfos.Any(d => d.Name == item.Name)) //如果子类已经包含父类的属性或者字段,跳过不处理
+ {
+ memberInfos.Add(item);
+ }
+ }
+ }
+ }
+ else
+ {
+ PropertyInfo[] properties = type.GetProperties();
+ if (null == properties || properties.Length <= 0)
+ {
+ return null;
+ }
+
+ memberInfos.AddRange(properties);
+ //获取父类的属性
+ var parentPropertyInfo = type.BaseType?.GetProperties();
+ if (parentPropertyInfo != null && parentPropertyInfo.Length > 0)
+ {
+ foreach (var item in parentPropertyInfo)
+ {
+ if (!memberInfos.Any(d => d.Name == item.Name)) //如果子类已经包含父类的属性或者字段,跳过不处理
+ {
+ memberInfos.Add(item);
+ }
+ }
+ }
+ }
+
+ foreach (var item in memberInfos)
+ {
+ DescriptionAttribute[] EnumAttributes =
+ (DescriptionAttribute[])item.GetCustomAttributes(typeof(DescriptionAttribute), false);
+
+ dynamic infoObject = null;
+ if (getPropertie == false)
+ {
+ infoObject = (FieldInfo)item;
+ }
+ else
+ {
+ infoObject = (PropertyInfo)item;
+ }
+
+ if (EnumAttributes.Length > 0)
+ {
+ SelectResult selectResult = new SelectResult()
+ {
+ Key = Convert.ToInt32(infoObject.GetValue(null)).ToString(),
+ Value = EnumAttributes[0].Description,
+ };
+
+ selectResults.Add(selectResult);
+ }
+
+ DisplayAttribute[] DisplayAttributes =
+ (DisplayAttribute[])item.GetCustomAttributes(typeof(DisplayAttribute), false);
+ if (DisplayAttributes.Length > 0)
+ {
+ SelectResult selectResult =
+ selectResults.FirstOrDefault(e => e.Key == Convert.ToInt32(infoObject.GetValue(null)).ToString());
+ if (selectResult != null)
+ {
+ selectResult.SecondValue = DisplayAttributes[0].Name;
+ }
+ }
+ }
+
+ return selectResults;
+ }
+
+ ///
+ /// 获取指定枚举的所有 Attribute 说明以及value组成的键值对
+ ///
+ /// 对象类
+ /// 第三个标签类型
+ /// 第三个标签类型取值名称
+ /// false获取字段、true获取属性
+ ///
+ public static List GetEnumAttributeListWithThirdValue(Type type, Type thirdAttributeType, string thirdAttributePropertieName, bool getPropertie = false)
+ {
+ if (type == null)
+ {
+ return null;
+ }
+
+ List selectResults = new List();
+ List memberInfos = new List();
+
+ if (getPropertie == false)
+ {
+ FieldInfo[] EnumInfo = type.GetFields();
+ if (null == EnumInfo || EnumInfo.Length <= 0)
+ {
+ return null;
+ }
+ memberInfos.AddRange(EnumInfo);
+ var parentFieldInfo = type.BaseType?.GetFields();
+ if (parentFieldInfo != null && parentFieldInfo.Length > 0)
+ {
+ memberInfos.AddRange(parentFieldInfo);
+ }
+ }
+ else
+ {
+ PropertyInfo[] EnumInfo = type.GetProperties();
+ if (null == EnumInfo || EnumInfo.Length <= 0)
+ {
+ return null;
+ }
+ memberInfos.AddRange(EnumInfo);
+ var parentPropertyInfo = type.BaseType?.GetProperties();
+ if (parentPropertyInfo != null && parentPropertyInfo.Length > 0)
+ {
+ memberInfos.AddRange(parentPropertyInfo);
+ }
+ }
+
+ foreach (var item in memberInfos)
+ {
+ var thirdAttributes = item.
+ GetCustomAttributes(thirdAttributeType, false);
+ if (thirdAttributes == null || thirdAttributes.Length <= 0)
+ {
+ continue;
+ }
+
+ DescriptionAttribute[] descriptionAttributes = (DescriptionAttribute[])item.
+ GetCustomAttributes(typeof(DescriptionAttribute), false);
+
+ dynamic infoObject = null;
+ if (getPropertie == false)
+ {
+ infoObject = (FieldInfo)item;
+ }
+ else
+ {
+ infoObject = (PropertyInfo)item;
+ }
+
+ if (descriptionAttributes.Length > 0)
+ {
+ SelectResult selectResult = new SelectResult()
+ {
+ Key = infoObject.Name,
+ Value = descriptionAttributes[0].Description,
+ };
+
+ selectResults.Add(selectResult);
+ }
+
+ DisplayAttribute[] displayAttributes = (DisplayAttribute[])item.
+ GetCustomAttributes(typeof(DisplayAttribute), false);
+ if (displayAttributes.Length > 0)
+ {
+ SelectResult selectResult = selectResults.FirstOrDefault(e => e.Key == infoObject.Name);
+ if (selectResult != null)
+ {
+ selectResult.SecondValue = displayAttributes[0].Name;
+ }
+ }
+
+ if (thirdAttributes.Length > 0 && !string.IsNullOrWhiteSpace(thirdAttributePropertieName))
+ {
+ foreach (var attr in thirdAttributes)
+ {
+ // 使用反射获取特性的属性值
+ var properties = thirdAttributeType.GetProperties();
+ foreach (var prop in properties)
+ {
+ // 假设你要获取特性的某个属性值,例如 TypeName
+ if (prop.Name == thirdAttributePropertieName)
+ {
+ object value = prop.GetValue(attr);
+ SelectResult selectResult = selectResults.FirstOrDefault(e => e.Key == infoObject.Name);
+ if (selectResult != null)
+ {
+ selectResult.ThirdValue = value?.ToString(); // 将属性值赋给 ThirdValue
+ }
+ break; // 如果找到了需要的属性,可以跳出循环
+ }
+ }
+ }
+ }
+ }
+
+ return selectResults;
+ }
+
+ ///
+ /// 获取指定类、指定常量值的Description说明
+ /// 包含直接继承的父级字段
+ ///
+ /// 对象类
+ ///
+ ///
+ public static List> GetTypeDescriptionListToTuple(Type t, bool getPropertie = false)
+ {
+ if (t == null)
+ {
+ return null;
+ }
+
+ List memberInfos = new List();
+
+ if (getPropertie == false)
+ {
+ FieldInfo[] fieldInfo = t.GetFields();
+ if (null == fieldInfo || fieldInfo.Length <= 0)
+ {
+ return null;
+ }
+
+ memberInfos.AddRange(fieldInfo);
+ var parentFieldInfo = t.BaseType?.GetFields();
+ if (parentFieldInfo != null && parentFieldInfo.Length > 0)
+ {
+ foreach (var item in parentFieldInfo)
+ {
+ if (!memberInfos.Any(d => d.Name == item.Name)) //如果子类已经包含父类的属性或者字段,跳过不处理
+ {
+ memberInfos.Add(item);
+ }
+ }
+ }
+ }
+ else
+ {
+ PropertyInfo[] fieldInfo = t.GetProperties();
+ if (null == fieldInfo || fieldInfo.Length <= 0)
+ {
+ return null;
+ }
+
+ memberInfos.AddRange(fieldInfo);
+ var parentPropertyInfo = t.BaseType?.GetProperties();
+ if (parentPropertyInfo != null && parentPropertyInfo.Length > 0)
+ {
+ foreach (var item in parentPropertyInfo)
+ {
+ if (!memberInfos.Any(d => d.Name == item.Name)) //如果子类已经包含父类的属性或者字段,跳过不处理
+ {
+ memberInfos.Add(item);
+ }
+ }
+ }
+ }
+
+ List> tuples = new List>();
+
+ foreach (var item in memberInfos)
+ {
+ DescriptionAttribute[] descriptionAttribute =
+ (DescriptionAttribute[])item.GetCustomAttributes(typeof(DescriptionAttribute), false);
+
+ NumericalOrderAttribute[] indexAttributes =
+ (NumericalOrderAttribute[])item.GetCustomAttributes(typeof(NumericalOrderAttribute), false);
+
+
+ if (descriptionAttribute.Length > 0 && indexAttributes.Length > 0)
+ {
+ Tuple tuple = new Tuple(item.Name,
+ descriptionAttribute[0].Description, indexAttributes[0].Index);
+ tuples.Add(tuple);
+ }
+ }
+
+ return tuples;
+ }
+
+ ///
+ /// 获取指定类、指定常量值的常量说明
+ ///
+ /// 常量字段名称
+ ///属性还是字段
+ ///
+ public static string GetTypeDescriptionName(string fieldName, bool getPropertie = false)
+ {
+ if (string.IsNullOrEmpty(fieldName))
+ {
+ return "";
+ }
+
+ MemberInfo memberInfo = null;
+ if (getPropertie == false)
+ {
+ memberInfo = typeof(T).GetField(fieldName);
+ }
+ else
+ {
+ memberInfo = typeof(T).GetProperty(fieldName);
+ }
+
+ if (null == memberInfo)
+ {
+ return "";
+ }
+
+ DescriptionAttribute[] EnumAttributes =
+ (DescriptionAttribute[])memberInfo.GetCustomAttributes(typeof(DescriptionAttribute), false);
+ if (EnumAttributes.Length <= 0)
+ {
+ return "";
+ }
+
+ return EnumAttributes[0].Description;
+ }
+
+ ///
+ /// 获取指定命名空间下指定常量值的常量说明
+ ///
+ /// 常量字段名称
+ /// 命名空间,主要用来找到对应程序集
+ ///属性还是字段
+ ///
+ public static string GetTypeDescriptionName(string fieldName, string assemblyName, bool getPropertie = false)
+ {
+ if (string.IsNullOrWhiteSpace(fieldName) || string.IsNullOrWhiteSpace(assemblyName))
+ {
+ return null;
+ }
+
+ string desc = "";
+ foreach (var item in GetEnumList(assemblyName))
+ {
+ desc = GetTypeDescriptionName(item, fieldName, getPropertie);
+ if (!string.IsNullOrEmpty(desc))
+ {
+ break;
+ }
+ }
+
+ return desc;
+ }
+
+ ///
+ /// 获取指定类、指定常量值的常量说明
+ ///
+ /// 对象类
+ /// 常量字段名称
+ ///属性还是字段
+ ///
+ public static string GetTypeDescriptionName(this Type t, string fieldName, bool getPropertie = false)
+ {
+ if (string.IsNullOrWhiteSpace(fieldName))
+ {
+ return "";
+ }
+
+ MemberInfo memberInfo = null;
+ if (getPropertie == false)
+ {
+ memberInfo = t.GetField(fieldName);
+ }
+ else
+ {
+ memberInfo = t.GetProperty(fieldName);
+ }
+
+ if (null != memberInfo)
+ {
+ DescriptionAttribute[] EnumAttributes =
+ (DescriptionAttribute[])memberInfo.GetCustomAttributes(typeof(DescriptionAttribute), false);
+ if (EnumAttributes.Length > 0)
+ {
+ return EnumAttributes[0].Description;
+ }
+ }
+
+ return "";
+ }
+
+ ///
+ /// 扩展方法,获得枚举值集合
+ ///
+ ///枚举的DisplayName
+ public static List GetEnumList() where T : Enum
+ {
+ List enumList = new List();
+ foreach (T value in Enum.GetValues(typeof(T)))
+ {
+ enumList.Add(value);
+ }
+
+ return enumList;
+ }
+
+ private static List GetEnumList(string assemblyName)
+ {
+ if (!String.IsNullOrEmpty(assemblyName))
+ {
+ Assembly assembly = Assembly.Load(assemblyName);
+ List ts = assembly.GetTypes().Where(x => x.GetTypeInfo().IsClass).ToList();
+ return ts;
+ }
+
+ return new List();
+ }
+
+ ///
+ /// 扩展方法,获得枚举的Display值
+ ///
+ ///枚举值
+ ///当枚举值没有定义DisplayNameAttribute,是否使用枚举名代替,默认是使用
+ ///属性还是字段
+ ///枚举的DisplayName
+ public static string GetDisplayName(this Enum value, Boolean nameInstead = true, bool getPropertie = false)
+ {
+ Type type = value.GetType();
+ string name = Enum.GetName(type, value);
+ if (name == null)
+ {
+ return null;
+ }
+
+ DisplayAttribute attribute = null;
+
+ if (getPropertie == false)
+ {
+ attribute = Attribute.GetCustomAttribute(type.GetField(name), typeof(DisplayAttribute)) as DisplayAttribute;
+ }
+ else
+ {
+ attribute =
+ Attribute.GetCustomAttribute(type.GetProperty(name), typeof(DisplayAttribute)) as DisplayAttribute;
+ }
+
+
+ if (attribute == null && nameInstead == true)
+ {
+ return name;
+ }
+
+ return attribute == null ? null : attribute.Name;
+ }
+
+ ///
+ /// 获取枚举的描述信息
+ ///
+ ///
+ ///
+ public static string GetEnumDescription(this Enum value)
+ {
+ var name = value.ToString();
+ var field = value.GetType().GetField(name);
+ if (field == null) return name;
+
+ var att = Attribute.GetCustomAttribute(field, typeof(DescriptionAttribute), false);
+
+ return att == null ? field.Name : ((DescriptionAttribute)att).Description;
+ }
+
+
+
+ ///
+ /// 将传入的字符串中间部分字符替换成特殊字符
+ ///
+ /// 需要替换的字符串
+ /// 前保留长度
+ /// 尾保留长度
+ /// 特殊字符
+ /// 被特殊字符替换的字符串
+ public static string ReplaceWithSpecialChar(this string value, int startLen = 1, int endLen = 1,
+ char specialChar = '*')
+ {
+ if (string.IsNullOrEmpty(value))
+ {
+ return value;
+ }
+
+ try
+ {
+
+ if (value.Length <= startLen + endLen)
+ {
+ var temStartVal = value.Substring(0, startLen);
+ return $"{temStartVal}{"".PadLeft(endLen, specialChar)}";
+ }
+
+ if (value.Length == 10 && endLen == 1)
+ {
+ endLen = 3;
+ }
+
+ var startVal = value.Substring(0, startLen);
+ var endVal = value.Substring(value.Length - endLen);
+ if (value.Length == 2)
+ {
+ endVal = "";
+ }
+
+ value = $"{startVal}{endVal.PadLeft(value.Length - startLen, specialChar)}";
+ }
+ catch (Exception)
+ {
+ throw;
+ }
+
+ return value;
+ }
+
+ ///
+ /// Linux下字体名称转换
+ ///
+ ///
+ ///
+ public static string GetLinuxFontFamily(this string fontValue)
+ {
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
+ {
+ if (fontValue == "楷体")
+ {
+ fontValue = "KaiTi";
+ }
+ else if (fontValue == "隶书")
+ {
+ fontValue = "LiSu";
+ }
+ else if (fontValue == "宋体")
+ {
+ fontValue = "SimSun";
+ }
+ else if (fontValue == "微软雅黑")
+ {
+ fontValue = "Microsoft YaHei";
+ }
+ else if (fontValue == "新宋体")
+ {
+ fontValue = "NSimSun";
+ }
+ else if (fontValue == "仿宋")
+ {
+ fontValue = "FangSong";
+ }
+ else if (fontValue == "黑体")
+ {
+ fontValue = "SimHei";
+ }
+
+ }
+
+ return fontValue;
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Common/Helpers/JsonHelper.cs b/src/JiShe.CollectBus.Common/Helpers/JsonHelper.cs
index 029ae1f..bdd46f6 100644
--- a/src/JiShe.CollectBus.Common/Helpers/JsonHelper.cs
+++ b/src/JiShe.CollectBus.Common/Helpers/JsonHelper.cs
@@ -123,4 +123,32 @@ namespace JiShe.CollectBus.Common.Helpers
writer.WriteStringValue(value.ToString(_dateFormatString));
}
}
+
+ ///
+ /// Unix格式时间格式化
+ ///
+ public class UnixTimeConverter : JsonConverter
+ {
+ public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ if (reader.TokenType == JsonTokenType.String)
+ {
+ if (long.TryParse(reader.GetString(), out long timestamp))
+ return DateTimeOffset.FromUnixTimeSeconds(timestamp).DateTime;
+ }
+
+ if (reader.TokenType == JsonTokenType.Number)
+ {
+ long timestamp = reader.GetInt64();
+ return DateTimeOffset.FromUnixTimeSeconds(timestamp).DateTime;
+ }
+ return reader.GetDateTime();
+ }
+
+ public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
+ {
+ long timestamp = new DateTimeOffset(value).ToUnixTimeSeconds();
+ writer.WriteStringValue(timestamp.ToString());
+ }
+ }
}
diff --git a/src/JiShe.CollectBus.Common/Helpers/SelectResult.cs b/src/JiShe.CollectBus.Common/Helpers/SelectResult.cs
new file mode 100644
index 0000000..4ccaf99
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/Helpers/SelectResult.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Helpers
+{
+ ///
+ /// 下拉框选项元素
+ ///
+ public class SelectResult
+ {
+ ///
+ /// 下拉框 键
+ ///
+ public string Key { get; set; }
+
+ ///
+ /// 下拉框 值
+ ///
+ public string Value { get; set; }
+
+ ///
+ /// 下拉框 值2
+ ///
+ public string SecondValue { get; set; }
+
+ ///
+ /// 下拉框 值3
+ ///
+ public object ThirdValue { get; set; }
+
+ }
+}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs
index 026a95a..71cc741 100644
--- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs
+++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs
@@ -16,16 +16,27 @@ namespace JiShe.CollectBus.IoTDBProvider
///
///
///
+ /// 构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名
///
- Task InsertAsync(T entity) where T : IoTEntity;
+ Task InsertAsync(T entity, int buildTabletMode) where T : IoTEntity;
///
/// 批量插入数据
///
///
///
+ /// 构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名
///
- Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity;
+ Task BatchInsertAsync(IEnumerable entities, int buildTabletMode) where T : IoTEntity;
+
+
+ ///
+ /// 删除数据
+ ///
+ ///
+ ///
+ ///
+ Task
+
+
+
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs
index 019bd28..d04cd48 100644
--- a/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs
+++ b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs
@@ -11,6 +11,11 @@ namespace JiShe.CollectBus.IoTDBProvider
///
public class QueryOptions
{
+ ///
+ /// 表名或标签名
+ ///
+ public required string TableNameOrTagName { get; set; }
+
///
/// 分页
///
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs
index 8e84776..0d5b408 100644
--- a/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs
+++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs
@@ -13,17 +13,17 @@ namespace JiShe.CollectBus.IoTDBProvider
public class DeviceMetadata
{
///
- /// 测量值集合,用于构建Table的测量值,也就是field参数
+ /// 测量值集合,用于构建Table的测量值,也就是columnNames参数
///
- public List Measurements { get; } = new();
+ public List ColumnNames { get; } = new();
///
- /// 列类型集合,用于构建Table的列类型,也就是columnCategory参数
+ /// 列类型集合,用于构建Table的列类型,也就是columnCategories参数
///
public List ColumnCategories { get; } = new();
///
- /// 值类型集合,用于构建Table的值类型,也就是dataType参数
+ /// 值类型集合,用于构建Table的值类型,也就是dataTypes参数
///
public ListDataTypes { get; } = new();
}
diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs
index 449884a..39e07c0 100644
--- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs
+++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs
@@ -1,5 +1,8 @@
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
+using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.Common.Helpers;
+using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
@@ -19,8 +22,9 @@ namespace JiShe.CollectBus.IoTDBProvider
private readonly IoTDBOptions _options;
private readonly TableSessionPool _sessionPool;
private static readonly ConcurrentDictionary _metadataCache = new();
+ private readonly ILogger _logger;
- public IoTDBProvider(IOptions options)
+ public IoTDBProvider(IOptions options, ILogger logger)
{
_options = options.Value;
@@ -32,58 +36,20 @@ namespace JiShe.CollectBus.IoTDBProvider
.Build();
_sessionPool.Open(false).Wait();
+ _logger = logger;
}
- ///
- /// 获取设备元数据
- ///
- ///
- ///
- private DeviceMetadata GetMetadata() where T : IoTEntity
- {
- return _metadataCache.GetOrAdd(typeof(T), type =>
- {
- var metadata = new DeviceMetadata();
- foreach (var prop in type.GetProperties())
- {
- //标签列
- var attrTAG = prop.GetCustomAttribute();
- if (attrTAG != null)
- {
- metadata.ColumnCategories.Add(ColumnCategory.TAG);
- }
-
- //属性列
- var attrATTRIBUTE = prop.GetCustomAttribute();
- if (attrATTRIBUTE != null)
- {
- metadata.ColumnCategories.Add(ColumnCategory.ATTRIBUTE);
- }
-
- //数据列
- var attrFIELD = prop.GetCustomAttribute();
- if (attrFIELD != null)
- {
- metadata.ColumnCategories.Add(ColumnCategory.FIELD);
- metadata.Measurements.Add(prop.Name);
- metadata.DataTypes.Add(GetDataTypeFromStr(prop.PropertyType.Name));
- }
- }
- return metadata;
- });
- }
-
///
/// 插入数据
///
///
///
///
- public async Task InsertAsync(T entity) where T : IoTEntity
+ public async Task InsertAsync(T entity, int buildTabletMode) where T : IoTEntity
{
var metadata = GetMetadata();
- var tablet = BuildTablet(new[] { entity }, metadata);
+ var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode);
await _sessionPool.InsertAsync(tablet);
}
@@ -93,58 +59,46 @@ namespace JiShe.CollectBus.IoTDBProvider
///
///
///
- public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity
+ public async Task BatchInsertAsync(IEnumerable entities, int buildTabletMode) where T : IoTEntity
{
- var metadata = GetMetadata();
+ var metadata = GetMetadata();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
- var tablet = BuildTablet(batch, metadata);
+ var tablet = BuildTablet(batch, metadata, buildTabletMode);
await _sessionPool.InsertAsync(tablet);
}
}
+
///
- /// 构建表模型
+ /// 删除数据
///
///
- ///
- ///
+ ///
///
- private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity
+ public async Task