From 73803cab65bbeb26b01c8590cf17cf2ee5e3aea3 Mon Sep 17 00:00:00 2001
From: cli <377476583@qq.com>
Date: Mon, 24 Feb 2025 13:47:12 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
readme.md | 132 ++++++++++++++++++
.../EnergySystem/EnergySystemAppService.cs | 40 +++---
.../CollectBusHostModule.cs | 1 +
.../CustomApplicationBuilderExtensions.cs | 13 ++
src/JiShe.CollectBus.Host/Startup.cs | 19 ++-
src/JiShe.CollectBus.Host/appsettings.json | 10 +-
.../Abstracts/BaseProtocolPlugin.cs | 20 +--
.../StandardProtocolPlugin.cs | 20 +--
8 files changed, 200 insertions(+), 55 deletions(-)
create mode 100644 readme.md
create mode 100644 src/JiShe.CollectBus.Host/Extensions/CustomApplicationBuilderExtensions.cs
diff --git a/readme.md b/readme.md
new file mode 100644
index 0000000..28a0678
--- /dev/null
+++ b/readme.md
@@ -0,0 +1,132 @@
+以下是一份采集端V5架构说明文档的参考模板,可根据实际需求调整内容:
+
+---
+
+# 采集端V5架构说明文档
+
+## 1. 系统概述
+### 1.1 目标定位
+面向海量异构数据源的实时采集与标准化处理,支持:
+- 高可用分布式部署
+- 十万级TPS吞吐量
+- 多协议/多格式模块化兼容
+- 动态扩展与负载均衡
+- 智能化异常熔断机制
+
+### 1.2 版本演进
+- V4 → V5核心升级:
+ - 微服务化架构改造
+ - 统一配置管理中心
+ - 支持Kafka/RabbitMQ双引擎
+ - 新增边缘计算能力
+ - 资源利用率提升40%
+
+## 2. 架构设计
+### 2.1 整体架构图
+(此处建议插入架构拓扑图)
+
+### 2.2 分层设计
+| 层级 | 组件 | 功能描述 |
+|-------------|---------------------------|------------------------------|
+| 数据接入层 | Protocol Adapter集群 | 多协议适配(HTTP/MQTT/自定义)|
+| 缓冲层 | 分布式消息队列 | 流量削峰与数据暂存 |
+| 处理层 | Stream Processor集群 | 数据清洗、格式转换、规则引擎 |
+| 存储层 | Metadata DB + Object存储 | 元数据管理+原始数据归档 |
+| 控制层 | 配置中心+监控中心 | 动态配置/健康检查/熔断控制 |
+
+## 3. 核心模块说明
+### 3.1 多协议适配器
+- 支持协议:HTTP/2、MQTT 3.1.1/5.0、CoAP、自定义TCP
+- 特性:
+ ```yaml
+ max_connections: 10000/节点
+ qps_threshold: 自动弹性扩容
+ ssl: 国密SM2/SM4支持
+ ```
+
+### 3.2 流量控制模块
+- 三级流量控制策略:
+ 1. 令牌桶全局限流
+ 2. 基于数据源的QoS分级
+ 3. 动态优先级队列
+
+### 3.3 数据清洗引擎
+- 功能组件:
+ - 数据校验(JSON Schema/XSD)
+ - 敏感字段脱敏
+ - 数据补全(IP→地理位置)
+ - 异常数据隔离重试
+
+## 4. 接口规范
+### 4.1 数据接收接口
+```rest
+POST /ingest/{datasource_id}
+Headers:
+ X-Auth-Token: [动态令牌]
+Body:
+{
+ "payload": "base64编码数据",
+ "metadata": {"timestamp": 1620000000, "geo": "31.23,121.47"}
+}
+```
+
+### 4.2 管理接口
+- 动态配置更新:
+ ```bash
+ curl -X PATCH /config/rate_limit \
+ -d '{"threshold": 5000, "strategy": "token_bucket"}'
+ ```
+
+## 5. 性能指标
+| 指标 | 单节点能力 | 集群扩展能力 |
+|----------------|------------|--------------|
+| 最大连接数 | 10,000 | 线性扩展 |
+| 数据处理延迟 | <50ms(p99) | - |
+| 吞吐量 | 20,000 TPS | 百万级TPS |
+| CPU利用率 | ≤70%@峰值 | 自动负载均衡 |
+
+## 6. 高可用设计
+- 故障恢复机制:
+ - 心跳检测(2s间隔)
+ - 自动故障转移(<30s)
+ - 数据双写备份
+- 熔断策略:
+ - 基于错误率(>30%触发)
+ - 基于系统负载(CPU>80%触发)
+
+## 7. 部署架构
+- 混合部署方案:
+ ```mermaid
+ graph TD
+ A[边缘节点] --> B[区域中心]
+ B --> C[中央数据中心]
+ C --> D[(大数据平台)]
+ ```
+- 资源要求:
+ - 最小部署单元:2C4G
+ - 推荐生产配置:8C16G/节点
+
+## 8. 安全设计
+- 数据安全:
+ - 传输层:TLS 1.3
+ - 存储加密:AES-256
+ - 完整性校验:SHA-256
+- 访问控制:
+ - RBAC权限模型
+ - 双因素认证
+ - 审计日志留存180天
+
+## 9. 版本演进规划
+- V5.1:AI驱动的异常检测
+- V5.2:区块链存证支持
+- V5.3:Serverless架构适配
+
+---
+
+**备注**:建议配合以下补充材料使用:
+1. 系统部署拓扑图
+2. 数据流程图
+3. API详细规范文档
+4. 性能压测报告
+
+可根据实际业务需求补充具体技术参数和实现细节。
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs
index 509f113..5465ab3 100644
--- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs
+++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs
@@ -42,7 +42,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/TranspondSend_10_98")]
+ [HttpPost("Handmould/TranspondSend_10_98")]
public async Task> ValveControl(ValveControlInput input)
{
var result = new BaseResultDto();
@@ -89,7 +89,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/TranspondSend_10_94")]
+ [HttpPost("Handmould/TranspondSend_10_94")]
public async Task> ReadTime(ReadTimeInput input)
{
var result = new BaseResultDto
@@ -124,7 +124,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/IssueArchives")]
+ [HttpPost("Handmould/IssueArchives")]
public async Task> AmmeterArchivesDown(AmmeterArchivesDownInput input)
{
var result = new BaseResultDto();
@@ -167,7 +167,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/ReadMeterAddress_10_105")]
+ [HttpPost("Handmould/ReadMeterAddress_10_105")]
public async Task AmmeterArchivesMatch(AmmeterArchivesMatchInput input)
{
var result = new BaseResultDto();
@@ -197,7 +197,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/EquitDubg")]
+ [HttpPost("Handmould/EquitDubg/CommunicationParametersSet")]
public async Task CommunicationParametersSet(CommunicationParametersSetInput input)
{
var result = new BaseResultDto();
@@ -243,7 +243,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/EquitDubg")]
+ [HttpPost("Handmould/EquitDubg/CallTimeTesting")]
public async Task CallTimeTesting(CallTimeTestingInput input)
{
var result = new BaseResultDto();
@@ -270,7 +270,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/EquitDubg")]
+ [HttpPost("Handmould/EquitDubg/TimeAdjust")]
public async Task TimeAdjust(TimeAdjustInput input)
{
var result = new BaseResultDto();
@@ -297,7 +297,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/EquitDubg")]
+ [HttpPost("Handmould/EquitDubg/TerminalRestart")]
public async Task TerminalRestart(TerminalRestartInput input)
{
var result = new BaseResultDto();
@@ -324,7 +324,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/EquitDubg")]
+ [HttpPost("Handmould/EquitDubg/ReadMeterNum")]
public async Task> ReadMeterNum(ReadMeterNumInput input)
{
var result = new BaseResultDto();
@@ -351,7 +351,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/IssueCmd")]
+ [HttpPost("Handmould/IssueCmd")]
public async Task> Reading(ReadingInput input)
{
var result = new BaseResultDto();
@@ -402,7 +402,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/IssueTranspondTimeSet_10_103")]
+ [HttpPost("Handmould/IssueTranspondTimeSet_10_103")]
public async Task TimeSet(TimeSetInput input)
{
var result = new BaseResultDto();
@@ -470,7 +470,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/SetAutoItemCode")]
+ [HttpPost("Handmould/SetAutoItemCode")]
public async Task AutoReportCollectionItemsSet(AutoReportCollectionItemsSetInput input)
{
var result = new BaseResultDto();
@@ -499,7 +499,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/SetAutoUpSwitch")]
+ [HttpPost("Handmould/SetAutoUpSwitch")]
public async Task AutoReportSet(AutoReportSetInput input)
{
var result = new BaseResultDto();
@@ -527,7 +527,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/QueryAutoUpSwitch")]
+ [HttpPost("Handmould/QueryAutoUpSwitch")]
public async Task QueryAutoReportOpenStatus(QueryAutoReportOpenStatusInput input)
{
var result = new BaseResultDto();
@@ -552,7 +552,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/BatchIssueVersion_09_1")]
+ [HttpPost("Handmould/BatchIssueVersion_09_1")]
public async Task> BatchReadVersion(BatchReadVersionInput input)
{
var result = new BaseResultDto();
@@ -580,7 +580,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Record/QueryRecordLog")]
+ [HttpPost("Record/QueryRecordLog")]
public Task> QueryRecordLog(QueryRecordLogInput input)
{
throw new NotImplementedException();
@@ -592,7 +592,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("Handmould/MeterTiming_10_104")]
+ [HttpPost("Handmould/MeterTiming_10_104")]
public async Task AdjustMeterTiming(AdjustMeterTimingInput input)
{
var result = new BaseResultDto();
@@ -636,7 +636,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("AfterSaleApi/AddConrOnlineRecord")]
+ [HttpPost("AfterSaleApi/AddConrOnlineRecord")]
public async Task AddConrOnlineRecord(List input)
{
var conrOnlineRecords = input.Select(it => new ConrOnlineRecord(it.AreaCode, it.Address, it.State, it.LastTime)).ToList();
@@ -653,7 +653,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("AfterSaleApi/RssiToWebApi")]
+ [HttpPost("AfterSaleApi/RssiToWebApi")]
public async Task AddSignalStrength(List input)
{
var csqRecords = input.Select(it =>
@@ -672,7 +672,7 @@ namespace JiShe.CollectBus.EnergySystem
///
///
///
- [Route("AfterSaleApi/AddFocusLog")]
+ [HttpPost("AfterSaleApi/AddFocusLog")]
public async Task AddFocusLog(List input)
{
var focusLogs = input.Select(it =>
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
index 4af190f..933641b 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -1,4 +1,5 @@
using Hangfire;
+using JiShe.CollectBus.Host.Extensions;
using JiShe.CollectBus.Host.HealthChecks;
using JiShe.CollectBus.Host.Swaggers;
using JiShe.CollectBus.MongoDB;
diff --git a/src/JiShe.CollectBus.Host/Extensions/CustomApplicationBuilderExtensions.cs b/src/JiShe.CollectBus.Host/Extensions/CustomApplicationBuilderExtensions.cs
new file mode 100644
index 0000000..6019cd2
--- /dev/null
+++ b/src/JiShe.CollectBus.Host/Extensions/CustomApplicationBuilderExtensions.cs
@@ -0,0 +1,13 @@
+using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using Volo.Abp.Caching;
+
+namespace JiShe.CollectBus.Host.Extensions
+{
+ public static class CustomApplicationBuilderExtensions
+ {
+ public static IApplicationBuilder UseProtocolPlugin(this IApplicationBuilder app)
+ {
+ return app;
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Host/Startup.cs b/src/JiShe.CollectBus.Host/Startup.cs
index 223e6d9..d5a0e4e 100644
--- a/src/JiShe.CollectBus.Host/Startup.cs
+++ b/src/JiShe.CollectBus.Host/Startup.cs
@@ -2,16 +2,26 @@
namespace JiShe.CollectBus.Host
{
+ ///
+ /// Startup
+ ///
public class Startup
{
private readonly IConfiguration _configuration;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The configuration.
public Startup(IConfiguration configuration)
{
_configuration = configuration;
}
-
+ ///
+ /// Configures the services.
+ ///
+ /// The services.
public void ConfigureServices(IServiceCollection services)
{
services.AddApplication(options =>
@@ -19,10 +29,13 @@ namespace JiShe.CollectBus.Host
// 加载插件,固定模式,可热插拔
options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins"));
});
-
- //services.AddApplication();
}
+ ///
+ /// Configures the specified application.
+ ///
+ /// The application.
+ /// The lifetime.
public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime)
{
app.InitializeApplication();
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index d39bfa8..4a166da 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -34,15 +34,15 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"ConnectionStrings": {
- "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:3712/JiSheCollectBus?authSource=admin",
- "Kafka": "118.190.144.92:29092,118.190.144.92:39092,118.190.144.92:49092",
+ "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin",
+ "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
- "Configuration": "118.190.144.92:6381,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
- "DefaultDB": "4",
- "HangfireDB": "5"
+ "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
+ "DefaultDB": "14",
+ "HangfireDB": "15"
},
"Jwt": {
"Audience": "JiShe.CollectBus",
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index 1aa68c4..67c9da4 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -20,6 +20,8 @@ using Newtonsoft.Json.Linq;
using System.Diagnostics.Metrics;
using Newtonsoft.Json;
using TouchSocket.Core;
+using Volo.Abp.Caching;
+using Microsoft.Extensions.DependencyInjection;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
@@ -29,7 +31,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
private readonly ILogger _logger;
private readonly IRepository _protocolInfoRepository;
-
//头部字节长度
public const int hearderLen = 6;
@@ -38,18 +39,16 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
public const string errorData = "EE";
///
- /// BaseProtocolPlugin
+ /// Initializes a new instance of the class.
///
- ///
- ///
- ///
- protected BaseProtocolPlugin(ILogger logger, IRepository protocolInfoRepository, ICapPublisher capBus)
+ /// The service provider.
+ protected BaseProtocolPlugin(IServiceProvider serviceProvider)
{
- _logger = logger;
- _protocolInfoRepository = protocolInfoRepository;
- _capBus = capBus;
- }
+ _logger = serviceProvider.GetRequiredService>();
+ _protocolInfoRepository = serviceProvider.GetRequiredService>();
+ _capBus = serviceProvider.GetRequiredService();
+ }
public abstract ProtocolInfo Info { get; }
@@ -64,6 +63,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
await _protocolInfoRepository.InsertAsync(Info);
+ //await _protocolInfoCache.Get()
}
public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null);
diff --git a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
index 9010924..8be2845 100644
--- a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
@@ -1,34 +1,20 @@
-using DotNetCore.CAP;
-using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.MessageReceiveds;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocols;
-using Microsoft.Extensions.Logging;
-using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Protocol
{
public class StandardProtocolPlugin: BaseProtocolPlugin
{
- private readonly ICapPublisher _capBus;
- private readonly ILogger _logger;
- private readonly IRepository _protocolInfoRepository;
-
///
/// Initializes a new instance of the class.
///
- /// The cap bus.
- /// The logger.
- /// The protocol information repository.
- public StandardProtocolPlugin(ICapPublisher capBus,
- ILogger logger,
- IRepository protocolInfoRepository) : base(logger, protocolInfoRepository, capBus)
+ /// The service provider.
+ public StandardProtocolPlugin(IServiceProvider serviceProvider) : base(serviceProvider)
{
- _capBus = capBus;
- _logger = logger;
- _protocolInfoRepository = protocolInfoRepository;
}
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");