From 83d8785ff48518beb19e9bc7c91b97aa09d89916 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 19 Mar 2025 14:31:04 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=8A=A5=E6=96=87=E8=A7=A3?=
=?UTF-8?q?=E6=9E=90=EF=BC=8C=E6=90=AD=E5=BB=BA=E6=95=B0=E6=8D=AE=E8=BF=81?=
=?UTF-8?q?=E7=A7=BB=E6=9E=B6=E6=9E=84?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../DataMigration/IDataMigrationService.cs | 20 +++
.../Options/DataMigrationOptions.cs | 39 +++++
.../Consumers/ReceivedConsumer.cs | 3 +-
.../DataMigration/DataMigrationService.cs | 153 ++++++++++++++++++
.../BasicScheduledMeterReadingService.cs | 50 +++---
...nergySystemScheduledMeterReadingService.cs | 3 +-
.../Subscribers/SubscriberAppService.cs | 12 +-
.../MeterFifteenMinuteReadingRecords.cs | 18 ---
.../MeterFiveMinuteReadingRecords.cs | 18 ---
.../MeterOneMinuteReadingRecords.cs | 18 ---
...adingRecords.cs => MeterReadingRecords.cs} | 14 +-
src/JiShe.CollectBus.Host/appsettings.json | 2 +-
.../MongoDB/CollectBusMongoDbContext.cs | 5 +-
.../Abstracts/BaseProtocolPlugin.cs | 2 +-
.../Interfaces/IProtocolPlugin.cs | 2 +-
.../StandardProtocolPlugin.cs | 17 +-
16 files changed, 274 insertions(+), 102 deletions(-)
create mode 100644 src/JiShe.CollectBus.Application.Contracts/DataMigration/IDataMigrationService.cs
create mode 100644 src/JiShe.CollectBus.Application.Contracts/DataMigration/Options/DataMigrationOptions.cs
create mode 100644 src/JiShe.CollectBus.Application/DataMigration/DataMigrationService.cs
delete mode 100644 src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs
delete mode 100644 src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs
delete mode 100644 src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs
rename src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/{BasicMeterReadingRecords.cs => MeterReadingRecords.cs} (89%)
diff --git a/src/JiShe.CollectBus.Application.Contracts/DataMigration/IDataMigrationService.cs b/src/JiShe.CollectBus.Application.Contracts/DataMigration/IDataMigrationService.cs
new file mode 100644
index 0000000..bbfa581
--- /dev/null
+++ b/src/JiShe.CollectBus.Application.Contracts/DataMigration/IDataMigrationService.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.DataMigration
+{
+ ///
+ /// 数据迁移服务
+ ///
+ public interface IDataMigrationService
+ {
+ ///
+ /// 开始迁移
+ ///
+ ///
+ Task StartMigrationAsync();
+ }
+}
diff --git a/src/JiShe.CollectBus.Application.Contracts/DataMigration/Options/DataMigrationOptions.cs b/src/JiShe.CollectBus.Application.Contracts/DataMigration/Options/DataMigrationOptions.cs
new file mode 100644
index 0000000..7174cad
--- /dev/null
+++ b/src/JiShe.CollectBus.Application.Contracts/DataMigration/Options/DataMigrationOptions.cs
@@ -0,0 +1,39 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.DataMigration.Options
+{
+ ///
+ /// 数据迁移配置
+ ///
+ public class DataMigrationOptions
+ {
+ ///
+ /// MongoDb每批处理量
+ ///
+ public int MongoDbDataBatchSize { get; set; } = 1000;
+
+ ///
+ /// 批量处理通道容量
+ ///
+ public int ChannelCapacity { get; set; } = 100;
+
+ ///
+ /// 数据库 每批处理量
+ ///
+ public int SqlBulkBatchSize { get; set; } = 1000;
+
+ ///
+ /// 数据库 每批处理超时时间
+ ///
+ public int SqlBulkTimeout { get; set; } = 60;
+
+ ///
+ /// 处理器数量
+ ///
+ public int ProcessorsCount { get; set; } = 4;
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs
index 4e00864..9309c55 100644
--- a/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs
+++ b/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs
@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using JiShe.CollectBus.Protocol.Contracts.Models;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -48,7 +49,7 @@ namespace JiShe.CollectBus.Consumers
var list = new List();
foreach (var contextItem in context.Message)
{
- await protocolPlugin.AnalyzeAsync(contextItem.Message);
+ await protocolPlugin.AnalyzeAsync(contextItem.Message);
list.Add(contextItem.Message);
}
await _messageReceivedEventRepository.InsertManyAsync(list);
diff --git a/src/JiShe.CollectBus.Application/DataMigration/DataMigrationService.cs b/src/JiShe.CollectBus.Application/DataMigration/DataMigrationService.cs
new file mode 100644
index 0000000..4aa3a7e
--- /dev/null
+++ b/src/JiShe.CollectBus.Application/DataMigration/DataMigrationService.cs
@@ -0,0 +1,153 @@
+using JiShe.CollectBus.DataMigration.Options;
+using JiShe.CollectBus.IotSystems.MeterReadingRecords;
+using LiteDB;
+using Microsoft.Extensions.Options;
+using System;
+using System.Data;
+using System.Linq;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Volo.Abp.Domain.Repositories;
+
+namespace JiShe.CollectBus.DataMigration
+{
+ ///
+ /// 数据迁移服务
+ ///
+ public class DataMigrationService: CollectBusAppService, IDataMigrationService
+ {
+ private readonly IRepository _meterReadingRecordsRepository;
+ private readonly DataMigrationOptions _options;
+
+
+ public DataMigrationService(IOptions options,
+ IRepository meterReadingRecordsRepository)
+ {
+ _options = options.Value;
+ _meterReadingRecordsRepository = meterReadingRecordsRepository;
+ }
+
+ ///
+ /// 开始迁移
+ ///
+ ///
+ public async Task StartMigrationAsync()
+ {
+ var rawDataChannel = Channel.CreateBounded(new BoundedChannelOptions(_options.ChannelCapacity)
+ {
+ SingleWriter = false,
+ SingleReader = false,
+ FullMode = BoundedChannelFullMode.Wait
+ });
+
+ var cleanDataChannel = Channel.CreateBounded(new BoundedChannelOptions(_options.ChannelCapacity)
+ {
+ SingleWriter = false,
+ SingleReader = false,
+ FullMode = BoundedChannelFullMode.Wait
+ });
+
+ // 启动生产者和消费者
+ var producer = Task.Run(() => ProduceDataAsync(rawDataChannel.Writer));
+
+ var processors = Enumerable.Range(0, _options.ProcessorsCount)
+ .Select(_ => Task.Run(() => ProcessDataAsync(rawDataChannel.Reader, cleanDataChannel.Writer)))
+ .ToArray();
+
+ var consumer = Task.Run(() => ConsumeDataAsync(cleanDataChannel.Reader));
+
+ await Task.WhenAll(new[] { producer }.Union(processors).Union(new[] { consumer }));
+ }
+
+ ///
+ /// 生产者,生产数据,主要是从MongoDB中读取数据
+ ///
+ ///
+ ///
+ private async Task ProduceDataAsync(ChannelWriter writer)
+ {
+ while (true)
+ {
+ var queryable = await _meterReadingRecordsRepository.GetQueryableAsync();
+ var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted)
+ .Take(_options.MongoDbDataBatchSize)
+ .ToArray();
+
+ if (batchRecords == null || batchRecords.Length == 0)
+ {
+ writer.Complete();
+ break;
+ }
+
+ await writer.WriteAsync(batchRecords);
+ }
+ }
+
+ ///
+ /// 清洗数据
+ ///
+ ///
+ ///
+ ///
+ private async Task ProcessDataAsync(ChannelReader reader, ChannelWriter writer)
+ {
+ await foreach (var batch in reader.ReadAllAsync())
+ {
+ //var dataTable = new DataTable();
+ //dataTable.Columns.Add("Id", typeof(string));
+ //dataTable.Columns.Add("CleanName", typeof(string));
+ //dataTable.Columns.Add("ProcessedTime", typeof(DateTime));
+
+ //foreach (var doc in batch)
+ //{
+ // // 业务清洗逻辑
+ // var cleanName = doc["name"].AsString.Trim().ToUpper();
+ // dataTable.Rows.Add(
+ // doc["_id"].ToString(),
+ // cleanName,
+ // DateTime.UtcNow);
+ //}
+
+ //await writer.WriteAsync(dataTable);
+
+ // 批量更新标记
+ var ids = batch.Select(d => d.Id).ToArray();
+ foreach (var item in batch)
+ {
+ item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress;
+ item.MigrationTime = DateTime.Now;
+ }
+
+ await _meterReadingRecordsRepository.UpdateManyAsync(batch);
+ }
+ writer.Complete();
+ }
+
+ ///
+ /// 消费清洗后的数据入库
+ ///
+ ///
+ ///
+ private async Task ConsumeDataAsync(ChannelReader reader)
+ {
+ //await using var connection = new SqlConnection(_sqlConnectionString);
+ //await connection.OpenAsync();
+
+ //await foreach (var dataTable in reader.ReadAllAsync())
+ //{
+ // using var bulkCopy = new SqlBulkCopy(connection)
+ // {
+ // DestinationTableName = "CleanData",
+ // BatchSize = 5000,
+ // BulkCopyTimeout = 300
+ // };
+
+ // bulkCopy.ColumnMappings.Add("Id", "Id");
+ // bulkCopy.ColumnMappings.Add("CleanName", "CleanName");
+ // bulkCopy.ColumnMappings.Add("ProcessedTime", "ProcessedTime");
+
+ // await bulkCopy.WriteToServerAsync(dataTable);
+ //}
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 2f144de..9c569d1 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -37,21 +37,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
private readonly ILogger _logger;
private readonly ICapPublisher _capBus;
- private readonly IRepository _fifteenMinuteReadingRecordRepository;
- private readonly IRepository _fiveMinuteReadingRecordRepository;
- private readonly IRepository _oneMinuteReadingRecordRepository;
+ private readonly IRepository _meterReadingRecordsRepository;
public BasicScheduledMeterReadingService(
ILogger logger,
ICapPublisher capBus,
- IRepository fifteenMinuteReadingRecordRepository, IRepository fiveMinuteReadingRecordRepository, IRepository oneMinuteReadingRecordRepository)
+ IRepository meterReadingRecordsRepository)
{
_capBus = capBus;
_logger = logger;
- _oneMinuteReadingRecordRepository = oneMinuteReadingRecordRepository;
- _fiveMinuteReadingRecordRepository = fiveMinuteReadingRecordRepository;
- _fifteenMinuteReadingRecordRepository = fifteenMinuteReadingRecordRepository;
+ _meterReadingRecordsRepository = meterReadingRecordsRepository;
}
///
@@ -279,14 +275,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
+ Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
- List meterTaskInfosList = new List();
+ List meterTaskInfosList = new List();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
@@ -307,7 +303,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
@@ -344,14 +340,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
+ Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
- List meterTaskInfosList = new List();
+ List meterTaskInfosList = new List();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
@@ -372,7 +368,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
@@ -411,14 +407,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
+ Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
- List meterTaskInfosList = new List();
+ List meterTaskInfosList = new List();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
@@ -439,7 +435,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
@@ -620,7 +616,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
- Dictionary keyValuePairs = new Dictionary();
+ Dictionary keyValuePairs = new Dictionary();
foreach (var tempItem in tempCodes)
{
@@ -675,7 +671,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
- var meterReadingRecords = new BasicMeterReadingRecords()
+ var meterReadingRecords = new MeterReadingRecords()
{
MeterAddress = ammeter.AmmerterAddress,
MeterId = ammeter.ID,
@@ -784,14 +780,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
+ Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
- List meterTaskInfosList = new List();
+ List meterTaskInfosList = new List();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
@@ -812,7 +808,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
@@ -850,14 +846,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
+ Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
- List meterTaskInfosList = new List();
+ List meterTaskInfosList = new List();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
@@ -878,7 +874,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
@@ -915,14 +911,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
+ Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
- List meterTaskInfosList = new List();
+ List meterTaskInfosList = new List();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
@@ -943,7 +939,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 5a26013..045c9c2 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -25,8 +25,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
public EnergySystemScheduledMeterReadingService(ILogger logger,
- ICapPublisher capBus,
- IRepository fifteenMinuteReadingRecordRepository, IRepository fiveMinuteReadingRecordRepository, IRepository oneMinuteReadingRecordRepository) :base(logger, capBus, fifteenMinuteReadingRecordRepository, fiveMinuteReadingRecordRepository, oneMinuteReadingRecordRepository)
+ ICapPublisher capBus, IRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository)
{
}
diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
index 9df8c39..fc05bc7 100644
--- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
@@ -5,8 +5,10 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
+using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
@@ -23,6 +25,7 @@ namespace JiShe.CollectBus.Subscribers
private readonly IRepository _messageReceivedHeartbeatEventRepository;
private readonly IRepository _messageReceivedEventRepository;
private readonly IRepository _deviceRepository;
+ private readonly IRepository _meterReadingRecordsRepository;
///
/// Initializes a new instance of the class.
@@ -34,12 +37,13 @@ namespace JiShe.CollectBus.Subscribers
/// The message received heartbeat event repository.
/// The message received event repository.
/// The device repository.
+ /// The device repository.
public SubscriberAppService(ILogger logger,
ITcpService tcpService, IServiceProvider serviceProvider,
IRepository messageReceivedLoginEventRepository,
IRepository messageReceivedHeartbeatEventRepository,
IRepository messageReceivedEventRepository,
- IRepository deviceRepository)
+ IRepository deviceRepository, IRepository meterReadingRecordsRepository)
{
_logger = logger;
_tcpService = tcpService;
@@ -48,6 +52,7 @@ namespace JiShe.CollectBus.Subscribers
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
_messageReceivedEventRepository = messageReceivedEventRepository;
_deviceRepository = deviceRepository;
+ _meterReadingRecordsRepository = meterReadingRecordsRepository;
}
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
@@ -90,8 +95,9 @@ namespace JiShe.CollectBus.Subscribers
}
else
{
- await protocolPlugin.AnalyzeAsync(receivedMessage);
- await _messageReceivedEventRepository.InsertAsync(receivedMessage);
+ //todo 会根据不同的协议进行解析,然后做业务处理
+ TB3761FN fN = await protocolPlugin.AnalyzeAsync(receivedMessage);
+ //await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
}
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs
deleted file mode 100644
index acc7163..0000000
--- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using JiShe.CollectBus.Common.Enums;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Volo.Abp.Domain.Entities;
-
-namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
-{
- ///
- /// 15分钟抄读记录表,包含下发报文和回复报文,以及是否迁移
- ///
- public class MeterFifteenMinuteReadingRecords : BasicMeterReadingRecords
- {
-
- }
-}
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs
deleted file mode 100644
index eeb69a2..0000000
--- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using JiShe.CollectBus.Common.Enums;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Volo.Abp.Domain.Entities;
-
-namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
-{
- ///
- /// 5分钟抄读记录表,包含下发报文和回复报文,以及是否迁移
- ///
- public class MeterFiveMinuteReadingRecords : BasicMeterReadingRecords
- {
-
- }
-}
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs
deleted file mode 100644
index 35b7630..0000000
--- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using JiShe.CollectBus.Common.Enums;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Volo.Abp.Domain.Entities;
-
-namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
-{
- ///
- /// 1分钟抄读记录表,包含下发报文和回复报文,以及是否迁移
- ///
- public class MeterOneMinuteReadingRecords : BasicMeterReadingRecords
- {
-
- }
-}
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs
similarity index 89%
rename from src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs
rename to src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs
index 315bbda..48e4cd8 100644
--- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs
+++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs
@@ -9,9 +9,9 @@ using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
///
- /// 抄读记录基类
+ /// 抄读数据记录
///
- public class BasicMeterReadingRecords : AggregateRoot
+ public class MeterReadingRecords : AggregateRoot
{
///
@@ -55,6 +55,16 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
///
public MeterTypeEnum MeterType { get; set; }
+ ///
+ /// 项目ID
+ ///
+ public int ProjectID { get; set; }
+
+ ///
+ /// 数据库业务ID
+ ///
+ public int DatabaseBusiID { get; set; }
+
///
/// AFN功能码
///
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index ec1bd69..1c50ce4 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -34,7 +34,7 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"ConnectionStrings": {
- "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin",
+ "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=200&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs
index 5c73029..2366553 100644
--- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs
+++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs
@@ -24,9 +24,8 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
public IMongoCollection Devices => Collection();
public IMongoCollection ProtocolInfos => Collection();
- public IMongoCollection FifteenMinuteReadingRecords => Collection();
- public IMongoCollection MeterFiveMinuteReadingRecords => Collection();
- public IMongoCollection MeterOneMinuteReadingRecords => Collection();
+ public IMongoCollection MeterReadingRecords => Collection();
+
protected override void CreateModel(IMongoModelBuilder modelBuilder)
{
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index b69c541..609dcdb 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
//await _protocolInfoCache.Get()
}
- public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null);
+ public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN;
///
/// 登录帧解析
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index bd08d60..5ad92c1 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
Task AddAsync();
- Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null);
+ Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN;
Task LoginAsync(MessageReceivedLogin messageReceived);
diff --git a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
index 79a92dd..a28cd2d 100644
--- a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
+++ b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
@@ -8,8 +8,8 @@ using JiShe.CollectBus.Protocol.Contracts.Models;
using Newtonsoft.Json.Linq;
namespace JiShe.CollectBus.Protocol
-{
- public class StandardProtocolPlugin: BaseProtocolPlugin
+{
+ public class StandardProtocolPlugin : BaseProtocolPlugin
{
///
/// Initializes a new instance of the class.
@@ -21,12 +21,15 @@ namespace JiShe.CollectBus.Protocol
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
- public override Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null)
+ public override async Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null)
{
var hexStringList = messageReceived.MessageHexString.StringToPairs();
var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
var afn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN);
var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN);
+
+ T analyze = default;
+
switch ((AFN)afn)
{
case AFN.确认或否认:
@@ -35,15 +38,15 @@ namespace JiShe.CollectBus.Protocol
case AFN.设置参数: break;
case AFN.查询参数: break;
case AFN.请求实时数据:
- if (Enum.IsDefined(typeof(ATypeOfDataItems), fn)) //Enum.TryParse(afn.ToString(), out ATypeOfDataItems parseResult)
+ if (Enum.IsDefined(typeof(ATypeOfDataItems), fn))
{
- AnalyzeReadingDataAsync(messageReceived, sendAction);
+ analyze = (T?)AnalyzeReadingDataAsync(messageReceived, sendAction);
}
break;
case AFN.请求历史数据:
if (Enum.IsDefined(typeof(IIdataTypeItems), fn))
{
- AnalyzeReadingTdcDataAsync(messageReceived, sendAction);
+ analyze = (T?)AnalyzeReadingTdcDataAsync(messageReceived, sendAction);
}
break;
case AFN.数据转发:
@@ -51,7 +54,7 @@ namespace JiShe.CollectBus.Protocol
break;
}
- throw new NotImplementedException();
+ return await Task.FromResult(analyze);
}
#region 上行命令