diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..28a0678 --- /dev/null +++ b/readme.md @@ -0,0 +1,132 @@ +以下是一份采集端V5架构说明文档的参考模板,可根据实际需求调整内容: + +--- + +# 采集端V5架构说明文档 + +## 1. 系统概述 +### 1.1 目标定位 +面向海量异构数据源的实时采集与标准化处理,支持: +- 高可用分布式部署 +- 十万级TPS吞吐量 +- 多协议/多格式模块化兼容 +- 动态扩展与负载均衡 +- 智能化异常熔断机制 + +### 1.2 版本演进 +- V4 → V5核心升级: + - 微服务化架构改造 + - 统一配置管理中心 + - 支持Kafka/RabbitMQ双引擎 + - 新增边缘计算能力 + - 资源利用率提升40% + +## 2. 架构设计 +### 2.1 整体架构图 +(此处建议插入架构拓扑图) + +### 2.2 分层设计 +| 层级 | 组件 | 功能描述 | +|-------------|---------------------------|------------------------------| +| 数据接入层 | Protocol Adapter集群 | 多协议适配(HTTP/MQTT/自定义)| +| 缓冲层 | 分布式消息队列 | 流量削峰与数据暂存 | +| 处理层 | Stream Processor集群 | 数据清洗、格式转换、规则引擎 | +| 存储层 | Metadata DB + Object存储 | 元数据管理+原始数据归档 | +| 控制层 | 配置中心+监控中心 | 动态配置/健康检查/熔断控制 | + +## 3. 核心模块说明 +### 3.1 多协议适配器 +- 支持协议:HTTP/2、MQTT 3.1.1/5.0、CoAP、自定义TCP +- 特性: + ```yaml + max_connections: 10000/节点 + qps_threshold: 自动弹性扩容 + ssl: 国密SM2/SM4支持 + ``` + +### 3.2 流量控制模块 +- 三级流量控制策略: + 1. 令牌桶全局限流 + 2. 基于数据源的QoS分级 + 3. 动态优先级队列 + +### 3.3 数据清洗引擎 +- 功能组件: + - 数据校验(JSON Schema/XSD) + - 敏感字段脱敏 + - 数据补全(IP→地理位置) + - 异常数据隔离重试 + +## 4. 接口规范 +### 4.1 数据接收接口 +```rest +POST /ingest/{datasource_id} +Headers: + X-Auth-Token: [动态令牌] +Body: +{ + "payload": "base64编码数据", + "metadata": {"timestamp": 1620000000, "geo": "31.23,121.47"} +} +``` + +### 4.2 管理接口 +- 动态配置更新: + ```bash + curl -X PATCH /config/rate_limit \ + -d '{"threshold": 5000, "strategy": "token_bucket"}' + ``` + +## 5. 性能指标 +| 指标 | 单节点能力 | 集群扩展能力 | +|----------------|------------|--------------| +| 最大连接数 | 10,000 | 线性扩展 | +| 数据处理延迟 | <50ms(p99) | - | +| 吞吐量 | 20,000 TPS | 百万级TPS | +| CPU利用率 | ≤70%@峰值 | 自动负载均衡 | + +## 6. 高可用设计 +- 故障恢复机制: + - 心跳检测(2s间隔) + - 自动故障转移(<30s) + - 数据双写备份 +- 熔断策略: + - 基于错误率(>30%触发) + - 基于系统负载(CPU>80%触发) + +## 7. 部署架构 +- 混合部署方案: + ```mermaid + graph TD + A[边缘节点] --> B[区域中心] + B --> C[中央数据中心] + C --> D[(大数据平台)] + ``` +- 资源要求: + - 最小部署单元:2C4G + - 推荐生产配置:8C16G/节点 + +## 8. 安全设计 +- 数据安全: + - 传输层:TLS 1.3 + - 存储加密:AES-256 + - 完整性校验:SHA-256 +- 访问控制: + - RBAC权限模型 + - 双因素认证 + - 审计日志留存180天 + +## 9. 版本演进规划 +- V5.1:AI驱动的异常检测 +- V5.2:区块链存证支持 +- V5.3:Serverless架构适配 + +--- + +**备注**:建议配合以下补充材料使用: +1. 系统部署拓扑图 +2. 数据流程图 +3. API详细规范文档 +4. 性能压测报告 + +可根据实际业务需求补充具体技术参数和实现细节。 \ No newline at end of file diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs index 509f113..5465ab3 100644 --- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs +++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs @@ -42,7 +42,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/TranspondSend_10_98")] + [HttpPost("Handmould/TranspondSend_10_98")] public async Task> ValveControl(ValveControlInput input) { var result = new BaseResultDto(); @@ -89,7 +89,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/TranspondSend_10_94")] + [HttpPost("Handmould/TranspondSend_10_94")] public async Task> ReadTime(ReadTimeInput input) { var result = new BaseResultDto @@ -124,7 +124,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/IssueArchives")] + [HttpPost("Handmould/IssueArchives")] public async Task> AmmeterArchivesDown(AmmeterArchivesDownInput input) { var result = new BaseResultDto(); @@ -167,7 +167,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/ReadMeterAddress_10_105")] + [HttpPost("Handmould/ReadMeterAddress_10_105")] public async Task AmmeterArchivesMatch(AmmeterArchivesMatchInput input) { var result = new BaseResultDto(); @@ -197,7 +197,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/EquitDubg")] + [HttpPost("Handmould/EquitDubg/CommunicationParametersSet")] public async Task CommunicationParametersSet(CommunicationParametersSetInput input) { var result = new BaseResultDto(); @@ -243,7 +243,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/EquitDubg")] + [HttpPost("Handmould/EquitDubg/CallTimeTesting")] public async Task CallTimeTesting(CallTimeTestingInput input) { var result = new BaseResultDto(); @@ -270,7 +270,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/EquitDubg")] + [HttpPost("Handmould/EquitDubg/TimeAdjust")] public async Task TimeAdjust(TimeAdjustInput input) { var result = new BaseResultDto(); @@ -297,7 +297,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/EquitDubg")] + [HttpPost("Handmould/EquitDubg/TerminalRestart")] public async Task TerminalRestart(TerminalRestartInput input) { var result = new BaseResultDto(); @@ -324,7 +324,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/EquitDubg")] + [HttpPost("Handmould/EquitDubg/ReadMeterNum")] public async Task> ReadMeterNum(ReadMeterNumInput input) { var result = new BaseResultDto(); @@ -351,7 +351,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/IssueCmd")] + [HttpPost("Handmould/IssueCmd")] public async Task> Reading(ReadingInput input) { var result = new BaseResultDto(); @@ -402,7 +402,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/IssueTranspondTimeSet_10_103")] + [HttpPost("Handmould/IssueTranspondTimeSet_10_103")] public async Task TimeSet(TimeSetInput input) { var result = new BaseResultDto(); @@ -470,7 +470,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/SetAutoItemCode")] + [HttpPost("Handmould/SetAutoItemCode")] public async Task AutoReportCollectionItemsSet(AutoReportCollectionItemsSetInput input) { var result = new BaseResultDto(); @@ -499,7 +499,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/SetAutoUpSwitch")] + [HttpPost("Handmould/SetAutoUpSwitch")] public async Task AutoReportSet(AutoReportSetInput input) { var result = new BaseResultDto(); @@ -527,7 +527,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/QueryAutoUpSwitch")] + [HttpPost("Handmould/QueryAutoUpSwitch")] public async Task QueryAutoReportOpenStatus(QueryAutoReportOpenStatusInput input) { var result = new BaseResultDto(); @@ -552,7 +552,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/BatchIssueVersion_09_1")] + [HttpPost("Handmould/BatchIssueVersion_09_1")] public async Task> BatchReadVersion(BatchReadVersionInput input) { var result = new BaseResultDto(); @@ -580,7 +580,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Record/QueryRecordLog")] + [HttpPost("Record/QueryRecordLog")] public Task> QueryRecordLog(QueryRecordLogInput input) { throw new NotImplementedException(); @@ -592,7 +592,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("Handmould/MeterTiming_10_104")] + [HttpPost("Handmould/MeterTiming_10_104")] public async Task AdjustMeterTiming(AdjustMeterTimingInput input) { var result = new BaseResultDto(); @@ -636,7 +636,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("AfterSaleApi/AddConrOnlineRecord")] + [HttpPost("AfterSaleApi/AddConrOnlineRecord")] public async Task AddConrOnlineRecord(List input) { var conrOnlineRecords = input.Select(it => new ConrOnlineRecord(it.AreaCode, it.Address, it.State, it.LastTime)).ToList(); @@ -653,7 +653,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("AfterSaleApi/RssiToWebApi")] + [HttpPost("AfterSaleApi/RssiToWebApi")] public async Task AddSignalStrength(List input) { var csqRecords = input.Select(it => @@ -672,7 +672,7 @@ namespace JiShe.CollectBus.EnergySystem /// /// /// - [Route("AfterSaleApi/AddFocusLog")] + [HttpPost("AfterSaleApi/AddFocusLog")] public async Task AddFocusLog(List input) { var focusLogs = input.Select(it => diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 4af190f..933641b 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -1,4 +1,5 @@ using Hangfire; +using JiShe.CollectBus.Host.Extensions; using JiShe.CollectBus.Host.HealthChecks; using JiShe.CollectBus.Host.Swaggers; using JiShe.CollectBus.MongoDB; diff --git a/src/JiShe.CollectBus.Host/Extensions/CustomApplicationBuilderExtensions.cs b/src/JiShe.CollectBus.Host/Extensions/CustomApplicationBuilderExtensions.cs new file mode 100644 index 0000000..6019cd2 --- /dev/null +++ b/src/JiShe.CollectBus.Host/Extensions/CustomApplicationBuilderExtensions.cs @@ -0,0 +1,13 @@ +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using Volo.Abp.Caching; + +namespace JiShe.CollectBus.Host.Extensions +{ + public static class CustomApplicationBuilderExtensions + { + public static IApplicationBuilder UseProtocolPlugin(this IApplicationBuilder app) + { + return app; + } + } +} diff --git a/src/JiShe.CollectBus.Host/Startup.cs b/src/JiShe.CollectBus.Host/Startup.cs index 223e6d9..d5a0e4e 100644 --- a/src/JiShe.CollectBus.Host/Startup.cs +++ b/src/JiShe.CollectBus.Host/Startup.cs @@ -2,16 +2,26 @@ namespace JiShe.CollectBus.Host { + /// + /// Startup + /// public class Startup { private readonly IConfiguration _configuration; + /// + /// Initializes a new instance of the class. + /// + /// The configuration. public Startup(IConfiguration configuration) { _configuration = configuration; } - + /// + /// Configures the services. + /// + /// The services. public void ConfigureServices(IServiceCollection services) { services.AddApplication(options => @@ -19,10 +29,13 @@ namespace JiShe.CollectBus.Host // 加载插件,固定模式,可热插拔 options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins")); }); - - //services.AddApplication(); } + /// + /// Configures the specified application. + /// + /// The application. + /// The lifetime. public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime) { app.InitializeApplication(); diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index d39bfa8..4a166da 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -34,15 +34,15 @@ "CorsOrigins": "http://localhost:4200,http://localhost:3100" }, "ConnectionStrings": { - "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:3712/JiSheCollectBus?authSource=admin", - "Kafka": "118.190.144.92:29092,118.190.144.92:39092,118.190.144.92:49092", + "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin", + "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" }, "Redis": { - "Configuration": "118.190.144.92:6381,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", - "DefaultDB": "4", - "HangfireDB": "5" + "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + "DefaultDB": "14", + "HangfireDB": "15" }, "Jwt": { "Audience": "JiShe.CollectBus", diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 1aa68c4..67c9da4 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -20,6 +20,8 @@ using Newtonsoft.Json.Linq; using System.Diagnostics.Metrics; using Newtonsoft.Json; using TouchSocket.Core; +using Volo.Abp.Caching; +using Microsoft.Extensions.DependencyInjection; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { @@ -29,7 +31,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; - //头部字节长度 public const int hearderLen = 6; @@ -38,18 +39,16 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts public const string errorData = "EE"; /// - /// BaseProtocolPlugin + /// Initializes a new instance of the class. /// - /// - /// - /// - protected BaseProtocolPlugin(ILogger logger, IRepository protocolInfoRepository, ICapPublisher capBus) + /// The service provider. + protected BaseProtocolPlugin(IServiceProvider serviceProvider) { - _logger = logger; - _protocolInfoRepository = protocolInfoRepository; - _capBus = capBus; - } + _logger = serviceProvider.GetRequiredService>(); + _protocolInfoRepository = serviceProvider.GetRequiredService>(); + _capBus = serviceProvider.GetRequiredService(); + } public abstract ProtocolInfo Info { get; } @@ -64,6 +63,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name); await _protocolInfoRepository.InsertAsync(Info); + //await _protocolInfoCache.Get() } public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null); diff --git a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 9010924..8be2845 100644 --- a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -1,34 +1,20 @@ -using DotNetCore.CAP; -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.MessageReceiveds; using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocols; -using Microsoft.Extensions.Logging; -using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Protocol { public class StandardProtocolPlugin: BaseProtocolPlugin { - private readonly ICapPublisher _capBus; - private readonly ILogger _logger; - private readonly IRepository _protocolInfoRepository; - /// /// Initializes a new instance of the class. /// - /// The cap bus. - /// The logger. - /// The protocol information repository. - public StandardProtocolPlugin(ICapPublisher capBus, - ILogger logger, - IRepository protocolInfoRepository) : base(logger, protocolInfoRepository, capBus) + /// The service provider. + public StandardProtocolPlugin(IServiceProvider serviceProvider) : base(serviceProvider) { - _capBus = capBus; - _logger = logger; - _protocolInfoRepository = protocolInfoRepository; } public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");