2025-03-19 14:31:04 +08:00
|
|
|
|
using JiShe.CollectBus.DataMigration.Options;
|
|
|
|
|
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
|
|
|
|
using LiteDB;
|
|
|
|
|
|
using Microsoft.Extensions.Options;
|
|
|
|
|
|
using System;
|
|
|
|
|
|
using System.Data;
|
|
|
|
|
|
using System.Linq;
|
|
|
|
|
|
using System.Threading.Channels;
|
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
using Volo.Abp.Domain.Repositories;
|
|
|
|
|
|
|
|
|
|
|
|
namespace JiShe.CollectBus.DataMigration
|
|
|
|
|
|
{
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 数据迁移服务
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public class DataMigrationService: CollectBusAppService, IDataMigrationService
|
|
|
|
|
|
{
|
|
|
|
|
|
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
|
|
|
|
|
|
private readonly DataMigrationOptions _options;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public DataMigrationService(IOptions<DataMigrationOptions> options,
|
|
|
|
|
|
IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
|
|
|
|
|
|
{
|
|
|
|
|
|
_options = options.Value;
|
|
|
|
|
|
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 开始迁移
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task StartMigrationAsync()
|
|
|
|
|
|
{
|
|
|
|
|
|
var rawDataChannel = Channel.CreateBounded<MeterReadingRecords[]>(new BoundedChannelOptions(_options.ChannelCapacity)
|
|
|
|
|
|
{
|
|
|
|
|
|
SingleWriter = false,
|
|
|
|
|
|
SingleReader = false,
|
|
|
|
|
|
FullMode = BoundedChannelFullMode.Wait
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
var cleanDataChannel = Channel.CreateBounded<DataTable>(new BoundedChannelOptions(_options.ChannelCapacity)
|
|
|
|
|
|
{
|
|
|
|
|
|
SingleWriter = false,
|
|
|
|
|
|
SingleReader = false,
|
|
|
|
|
|
FullMode = BoundedChannelFullMode.Wait
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
// 启动生产者和消费者
|
|
|
|
|
|
var producer = Task.Run(() => ProduceDataAsync(rawDataChannel.Writer));
|
|
|
|
|
|
|
|
|
|
|
|
var processors = Enumerable.Range(0, _options.ProcessorsCount)
|
|
|
|
|
|
.Select(_ => Task.Run(() => ProcessDataAsync(rawDataChannel.Reader, cleanDataChannel.Writer)))
|
|
|
|
|
|
.ToArray();
|
|
|
|
|
|
|
|
|
|
|
|
var consumer = Task.Run(() => ConsumeDataAsync(cleanDataChannel.Reader));
|
|
|
|
|
|
|
|
|
|
|
|
await Task.WhenAll(new[] { producer }.Union(processors).Union(new[] { consumer }));
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 生产者,生产数据,主要是从MongoDB中读取数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="writer"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task ProduceDataAsync(ChannelWriter<MeterReadingRecords[]> writer)
|
|
|
|
|
|
{
|
2025-04-08 17:44:42 +08:00
|
|
|
|
//while (true)
|
|
|
|
|
|
//{
|
|
|
|
|
|
// var queryable = await _meterReadingRecordsRepository.GetQueryableAsync();
|
|
|
|
|
|
// var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted)
|
|
|
|
|
|
// .Take(_options.MongoDbDataBatchSize)
|
|
|
|
|
|
// .ToArray();
|
|
|
|
|
|
|
|
|
|
|
|
// if (batchRecords == null || batchRecords.Length == 0)
|
|
|
|
|
|
// {
|
|
|
|
|
|
// writer.Complete();
|
|
|
|
|
|
// break;
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
// await writer.WriteAsync(batchRecords);
|
|
|
|
|
|
//}
|
2025-03-19 14:31:04 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 清洗数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="reader"></param>
|
|
|
|
|
|
/// <param name="writer"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task ProcessDataAsync(ChannelReader<MeterReadingRecords[]> reader, ChannelWriter<DataTable> writer)
|
|
|
|
|
|
{
|
|
|
|
|
|
await foreach (var batch in reader.ReadAllAsync())
|
|
|
|
|
|
{
|
|
|
|
|
|
//var dataTable = new DataTable();
|
|
|
|
|
|
//dataTable.Columns.Add("Id", typeof(string));
|
|
|
|
|
|
//dataTable.Columns.Add("CleanName", typeof(string));
|
|
|
|
|
|
//dataTable.Columns.Add("ProcessedTime", typeof(DateTime));
|
|
|
|
|
|
|
|
|
|
|
|
//foreach (var doc in batch)
|
|
|
|
|
|
//{
|
|
|
|
|
|
// // 业务清洗逻辑
|
|
|
|
|
|
// var cleanName = doc["name"].AsString.Trim().ToUpper();
|
|
|
|
|
|
// dataTable.Rows.Add(
|
|
|
|
|
|
// doc["_id"].ToString(),
|
|
|
|
|
|
// cleanName,
|
|
|
|
|
|
// DateTime.UtcNow);
|
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
|
|
//await writer.WriteAsync(dataTable);
|
|
|
|
|
|
|
|
|
|
|
|
// 批量更新标记
|
2025-04-08 17:44:42 +08:00
|
|
|
|
//var ids = batch.Select(d => d.Id).ToArray();
|
|
|
|
|
|
//foreach (var item in batch)
|
|
|
|
|
|
//{
|
|
|
|
|
|
// item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress;
|
|
|
|
|
|
// item.MigrationTime = DateTime.Now;
|
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
|
|
//await _meterReadingRecordsRepository.UpdateManyAsync(batch);
|
2025-03-19 14:31:04 +08:00
|
|
|
|
}
|
|
|
|
|
|
writer.Complete();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 消费清洗后的数据入库
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="reader"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task ConsumeDataAsync(ChannelReader<DataTable> reader)
|
|
|
|
|
|
{
|
|
|
|
|
|
//await using var connection = new SqlConnection(_sqlConnectionString);
|
|
|
|
|
|
//await connection.OpenAsync();
|
|
|
|
|
|
|
|
|
|
|
|
//await foreach (var dataTable in reader.ReadAllAsync())
|
|
|
|
|
|
//{
|
|
|
|
|
|
// using var bulkCopy = new SqlBulkCopy(connection)
|
|
|
|
|
|
// {
|
|
|
|
|
|
// DestinationTableName = "CleanData",
|
|
|
|
|
|
// BatchSize = 5000,
|
|
|
|
|
|
// BulkCopyTimeout = 300
|
|
|
|
|
|
// };
|
|
|
|
|
|
|
|
|
|
|
|
// bulkCopy.ColumnMappings.Add("Id", "Id");
|
|
|
|
|
|
// bulkCopy.ColumnMappings.Add("CleanName", "CleanName");
|
|
|
|
|
|
// bulkCopy.ColumnMappings.Add("ProcessedTime", "ProcessedTime");
|
|
|
|
|
|
|
|
|
|
|
|
// await bulkCopy.WriteToServerAsync(dataTable);
|
|
|
|
|
|
//}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|