This commit is contained in:
cli 2025-02-24 13:47:12 +08:00
parent 39e1f25889
commit 73803cab65
8 changed files with 200 additions and 55 deletions

132
readme.md Normal file
View File

@ -0,0 +1,132 @@
以下是一份采集端V5架构说明文档的参考模板可根据实际需求调整内容
---
# 采集端V5架构说明文档
## 1. 系统概述
### 1.1 目标定位
面向海量异构数据源的实时采集与标准化处理,支持:
- 高可用分布式部署
- 十万级TPS吞吐量
- 多协议/多格式模块化兼容
- 动态扩展与负载均衡
- 智能化异常熔断机制
### 1.2 版本演进
- V4 → V5核心升级
- 微服务化架构改造
- 统一配置管理中心
- 支持Kafka/RabbitMQ双引擎
- 新增边缘计算能力
- 资源利用率提升40%
## 2. 架构设计
### 2.1 整体架构图
(此处建议插入架构拓扑图)
### 2.2 分层设计
| 层级 | 组件 | 功能描述 |
|-------------|---------------------------|------------------------------|
| 数据接入层 | Protocol Adapter集群 | 多协议适配HTTP/MQTT/自定义)|
| 缓冲层 | 分布式消息队列 | 流量削峰与数据暂存 |
| 处理层 | Stream Processor集群 | 数据清洗、格式转换、规则引擎 |
| 存储层 | Metadata DB + Object存储 | 元数据管理+原始数据归档 |
| 控制层 | 配置中心+监控中心 | 动态配置/健康检查/熔断控制 |
## 3. 核心模块说明
### 3.1 多协议适配器
- 支持协议HTTP/2、MQTT 3.1.1/5.0、CoAP、自定义TCP
- 特性:
```yaml
max_connections: 10000/节点
qps_threshold: 自动弹性扩容
ssl: 国密SM2/SM4支持
```
### 3.2 流量控制模块
- 三级流量控制策略:
1. 令牌桶全局限流
2. 基于数据源的QoS分级
3. 动态优先级队列
### 3.3 数据清洗引擎
- 功能组件:
- 数据校验JSON Schema/XSD
- 敏感字段脱敏
- 数据补全IP→地理位置
- 异常数据隔离重试
## 4. 接口规范
### 4.1 数据接收接口
```rest
POST /ingest/{datasource_id}
Headers:
X-Auth-Token: [动态令牌]
Body:
{
"payload": "base64编码数据",
"metadata": {"timestamp": 1620000000, "geo": "31.23,121.47"}
}
```
### 4.2 管理接口
- 动态配置更新:
```bash
curl -X PATCH /config/rate_limit \
-d '{"threshold": 5000, "strategy": "token_bucket"}'
```
## 5. 性能指标
| 指标 | 单节点能力 | 集群扩展能力 |
|----------------|------------|--------------|
| 最大连接数 | 10,000 | 线性扩展 |
| 数据处理延迟 | <50ms(p99) | - |
| 吞吐量 | 20,000 TPS | 百万级TPS |
| CPU利用率 | ≤70%@峰值 | 自动负载均衡 |
## 6. 高可用设计
- 故障恢复机制:
- 心跳检测2s间隔
- 自动故障转移(<30s
- 数据双写备份
- 熔断策略:
- 基于错误率(>30%触发)
- 基于系统负载CPU>80%触发)
## 7. 部署架构
- 混合部署方案:
```mermaid
graph TD
A[边缘节点] --> B[区域中心]
B --> C[中央数据中心]
C --> D[(大数据平台)]
```
- 资源要求:
- 最小部署单元2C4G
- 推荐生产配置8C16G/节点
## 8. 安全设计
- 数据安全:
- 传输层TLS 1.3
- 存储加密AES-256
- 完整性校验SHA-256
- 访问控制:
- RBAC权限模型
- 双因素认证
- 审计日志留存180天
## 9. 版本演进规划
- V5.1AI驱动的异常检测
- V5.2:区块链存证支持
- V5.3Serverless架构适配
---
**备注**:建议配合以下补充材料使用:
1. 系统部署拓扑图
2. 数据流程图
3. API详细规范文档
4. 性能压测报告
可根据实际业务需求补充具体技术参数和实现细节。

View File

@ -42,7 +42,7 @@ namespace JiShe.CollectBus.EnergySystem
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[Route("Handmould/TranspondSend_10_98")]
[HttpPost("Handmould/TranspondSend_10_98")]
public async Task<BaseResultDto<ValveControlOutput>> ValveControl(ValveControlInput input)
{
var result = new BaseResultDto<ValveControlOutput>();
@ -89,7 +89,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/TranspondSend_10_94")]
[HttpPost("Handmould/TranspondSend_10_94")]
public async Task<BaseResultDto<ReadTimeOutput>> ReadTime(ReadTimeInput input)
{
var result = new BaseResultDto<ReadTimeOutput>
@ -124,7 +124,7 @@ namespace JiShe.CollectBus.EnergySystem
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[Route("Handmould/IssueArchives")]
[HttpPost("Handmould/IssueArchives")]
public async Task<BaseResultDto<AmmeterArchivesDownOutput>> AmmeterArchivesDown(AmmeterArchivesDownInput input)
{
var result = new BaseResultDto<AmmeterArchivesDownOutput>();
@ -167,7 +167,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/ReadMeterAddress_10_105")]
[HttpPost("Handmould/ReadMeterAddress_10_105")]
public async Task<BaseResultDto> AmmeterArchivesMatch(AmmeterArchivesMatchInput input)
{
var result = new BaseResultDto();
@ -197,7 +197,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/EquitDubg")]
[HttpPost("Handmould/EquitDubg/CommunicationParametersSet")]
public async Task<BaseResultDto> CommunicationParametersSet(CommunicationParametersSetInput input)
{
var result = new BaseResultDto();
@ -243,7 +243,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/EquitDubg")]
[HttpPost("Handmould/EquitDubg/CallTimeTesting")]
public async Task<BaseResultDto> CallTimeTesting(CallTimeTestingInput input)
{
var result = new BaseResultDto();
@ -270,7 +270,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/EquitDubg")]
[HttpPost("Handmould/EquitDubg/TimeAdjust")]
public async Task<BaseResultDto> TimeAdjust(TimeAdjustInput input)
{
var result = new BaseResultDto();
@ -297,7 +297,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/EquitDubg")]
[HttpPost("Handmould/EquitDubg/TerminalRestart")]
public async Task<BaseResultDto> TerminalRestart(TerminalRestartInput input)
{
var result = new BaseResultDto();
@ -324,7 +324,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/EquitDubg")]
[HttpPost("Handmould/EquitDubg/ReadMeterNum")]
public async Task<BaseResultDto<ReadMeterNumOutput>> ReadMeterNum(ReadMeterNumInput input)
{
var result = new BaseResultDto<ReadMeterNumOutput>();
@ -351,7 +351,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/IssueCmd")]
[HttpPost("Handmould/IssueCmd")]
public async Task<BaseResultDto<ReadingOutput>> Reading(ReadingInput input)
{
var result = new BaseResultDto<ReadingOutput>();
@ -402,7 +402,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/IssueTranspondTimeSet_10_103")]
[HttpPost("Handmould/IssueTranspondTimeSet_10_103")]
public async Task<BaseResultDto> TimeSet(TimeSetInput input)
{
var result = new BaseResultDto();
@ -470,7 +470,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/SetAutoItemCode")]
[HttpPost("Handmould/SetAutoItemCode")]
public async Task<BaseResultDto> AutoReportCollectionItemsSet(AutoReportCollectionItemsSetInput input)
{
var result = new BaseResultDto();
@ -499,7 +499,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/SetAutoUpSwitch")]
[HttpPost("Handmould/SetAutoUpSwitch")]
public async Task<BaseResultDto> AutoReportSet(AutoReportSetInput input)
{
var result = new BaseResultDto();
@ -527,7 +527,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/QueryAutoUpSwitch")]
[HttpPost("Handmould/QueryAutoUpSwitch")]
public async Task<BaseResultDto> QueryAutoReportOpenStatus(QueryAutoReportOpenStatusInput input)
{
var result = new BaseResultDto();
@ -552,7 +552,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/BatchIssueVersion_09_1")]
[HttpPost("Handmould/BatchIssueVersion_09_1")]
public async Task<BaseResultDto<BatchReadVersionOutput>> BatchReadVersion(BatchReadVersionInput input)
{
var result = new BaseResultDto<BatchReadVersionOutput>();
@ -580,7 +580,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Record/QueryRecordLog")]
[HttpPost("Record/QueryRecordLog")]
public Task<BaseResultDto<QueryRecordLogOutput>> QueryRecordLog(QueryRecordLogInput input)
{
throw new NotImplementedException();
@ -592,7 +592,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("Handmould/MeterTiming_10_104")]
[HttpPost("Handmould/MeterTiming_10_104")]
public async Task<BaseResultDto> AdjustMeterTiming(AdjustMeterTimingInput input)
{
var result = new BaseResultDto();
@ -636,7 +636,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("AfterSaleApi/AddConrOnlineRecord")]
[HttpPost("AfterSaleApi/AddConrOnlineRecord")]
public async Task<BaseResultDto> AddConrOnlineRecord(List<AddConrOnlineRecordInput> input)
{
var conrOnlineRecords = input.Select(it => new ConrOnlineRecord(it.AreaCode, it.Address, it.State, it.LastTime)).ToList();
@ -653,7 +653,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("AfterSaleApi/RssiToWebApi")]
[HttpPost("AfterSaleApi/RssiToWebApi")]
public async Task<BaseResultDto> AddSignalStrength(List<AddSignalStrengthInput> input)
{
var csqRecords = input.Select(it =>
@ -672,7 +672,7 @@ namespace JiShe.CollectBus.EnergySystem
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
[Route("AfterSaleApi/AddFocusLog")]
[HttpPost("AfterSaleApi/AddFocusLog")]
public async Task<BaseResultDto> AddFocusLog(List<AddFocusLogInput> input)
{
var focusLogs = input.Select(it =>

View File

@ -1,4 +1,5 @@
using Hangfire;
using JiShe.CollectBus.Host.Extensions;
using JiShe.CollectBus.Host.HealthChecks;
using JiShe.CollectBus.Host.Swaggers;
using JiShe.CollectBus.MongoDB;

View File

@ -0,0 +1,13 @@
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Volo.Abp.Caching;
namespace JiShe.CollectBus.Host.Extensions
{
public static class CustomApplicationBuilderExtensions
{
public static IApplicationBuilder UseProtocolPlugin(this IApplicationBuilder app)
{
return app;
}
}
}

View File

@ -2,16 +2,26 @@
namespace JiShe.CollectBus.Host
{
/// <summary>
/// Startup
/// </summary>
public class Startup
{
private readonly IConfiguration _configuration;
/// <summary>
/// Initializes a new instance of the <see cref="Startup"/> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
public Startup(IConfiguration configuration)
{
_configuration = configuration;
}
/// <summary>
/// Configures the services.
/// </summary>
/// <param name="services">The services.</param>
public void ConfigureServices(IServiceCollection services)
{
services.AddApplication<CollectBusHostModule>(options =>
@ -19,10 +29,13 @@ namespace JiShe.CollectBus.Host
// 加载插件,固定模式,可热插拔
options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins"));
});
//services.AddApplication<CollectBusHostModule>();
}
/// <summary>
/// Configures the specified application.
/// </summary>
/// <param name="app">The application.</param>
/// <param name="lifetime">The lifetime.</param>
public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime)
{
app.InitializeApplication();

View File

@ -34,15 +34,15 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"ConnectionStrings": {
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:3712/JiSheCollectBus?authSource=admin",
"Kafka": "118.190.144.92:29092,118.190.144.92:39092,118.190.144.92:49092",
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin",
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
"Configuration": "118.190.144.92:6381,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "4",
"HangfireDB": "5"
"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "14",
"HangfireDB": "15"
},
"Jwt": {
"Audience": "JiShe.CollectBus",

View File

@ -20,6 +20,8 @@ using Newtonsoft.Json.Linq;
using System.Diagnostics.Metrics;
using Newtonsoft.Json;
using TouchSocket.Core;
using Volo.Abp.Caching;
using Microsoft.Extensions.DependencyInjection;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
@ -29,7 +31,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
private readonly ILogger<BaseProtocolPlugin> _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
//头部字节长度
public const int hearderLen = 6;
@ -38,18 +39,16 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
public const string errorData = "EE";
/// <summary>
/// BaseProtocolPlugin
/// Initializes a new instance of the <see cref="BaseProtocolPlugin"/> class.
/// </summary>
/// <param name="logger"></param>
/// <param name="protocolInfoRepository"></param>
/// <param name="capBus"></param>
protected BaseProtocolPlugin(ILogger<BaseProtocolPlugin> logger, IRepository<ProtocolInfo, Guid> protocolInfoRepository, ICapPublisher capBus)
/// <param name="serviceProvider">The service provider.</param>
protected BaseProtocolPlugin(IServiceProvider serviceProvider)
{
_logger = logger;
_protocolInfoRepository = protocolInfoRepository;
_capBus = capBus;
}
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_capBus = serviceProvider.GetRequiredService<ICapPublisher>();
}
public abstract ProtocolInfo Info { get; }
@ -64,6 +63,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
await _protocolInfoRepository.InsertAsync(Info);
//await _protocolInfoCache.Get()
}
public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action<byte[]>? sendAction = null);

View File

@ -1,34 +1,20 @@
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.MessageReceiveds;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocols;
using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Protocol
{
public class StandardProtocolPlugin: BaseProtocolPlugin
{
private readonly ICapPublisher _capBus;
private readonly ILogger<BaseProtocolPlugin> _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
/// <summary>
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class.
/// </summary>
/// <param name="capBus">The cap bus.</param>
/// <param name="logger">The logger.</param>
/// <param name="protocolInfoRepository">The protocol information repository.</param>
public StandardProtocolPlugin(ICapPublisher capBus,
ILogger<BaseProtocolPlugin> logger,
IRepository<ProtocolInfo, Guid> protocolInfoRepository) : base(logger, protocolInfoRepository, capBus)
/// <param name="serviceProvider">The service provider.</param>
public StandardProtocolPlugin(IServiceProvider serviceProvider) : base(serviceProvider)
{
_capBus = capBus;
_logger = logger;
_protocolInfoRepository = protocolInfoRepository;
}
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");