2025-04-17 20:28:50 +08:00

154 lines
5.6 KiB
C#

using JiShe.CollectBus.DataMigration.Options;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using LiteDB;
using Microsoft.Extensions.Options;
using System;
using System.Data;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.DataMigration
{
/// <summary>
/// 数据迁移服务
/// </summary>
public class DataMigrationService: CollectBusAppService, IDataMigrationService
{
private readonly IRepository<MeterReadingRecords, Guid> _meterReadingRecordsRepository;
private readonly DataMigrationOptions _options;
public DataMigrationService(IOptions<DataMigrationOptions> options,
IRepository<MeterReadingRecords, Guid> meterReadingRecordsRepository)
{
_options = options.Value;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
}
/// <summary>
/// 开始迁移
/// </summary>
/// <returns></returns>
public async Task StartMigrationAsync()
{
var rawDataChannel = Channel.CreateBounded<MeterReadingRecords[]>(new BoundedChannelOptions(_options.ChannelCapacity)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
var cleanDataChannel = Channel.CreateBounded<DataTable>(new BoundedChannelOptions(_options.ChannelCapacity)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
// 启动生产者和消费者
var producer = Task.Run(() => ProduceDataAsync(rawDataChannel.Writer));
var processors = Enumerable.Range(0, _options.ProcessorsCount)
.Select(_ => Task.Run(() => ProcessDataAsync(rawDataChannel.Reader, cleanDataChannel.Writer)))
.ToArray();
var consumer = Task.Run(() => ConsumeDataAsync(cleanDataChannel.Reader));
await Task.WhenAll(new[] { producer }.Union(processors).Union(new[] { consumer }));
}
/// <summary>
/// 生产者,生产数据,主要是从MongoDB中读取数据
/// </summary>
/// <param name="writer"></param>
/// <returns></returns>
private async Task ProduceDataAsync(ChannelWriter<MeterReadingRecords[]> writer)
{
//while (true)
//{
// var queryable = await _meterReadingRecordsRepository.GetQueryableAsync();
// var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted)
// .Take(_options.MongoDbDataBatchSize)
// .ToArray();
// if (batchRecords == null || batchRecords.Length == 0)
// {
// writer.Complete();
// break;
// }
// await writer.WriteAsync(batchRecords);
//}
}
/// <summary>
/// 清洗数据
/// </summary>
/// <param name="reader"></param>
/// <param name="writer"></param>
/// <returns></returns>
private async Task ProcessDataAsync(ChannelReader<MeterReadingRecords[]> reader, ChannelWriter<DataTable> writer)
{
await foreach (var batch in reader.ReadAllAsync())
{
//var dataTable = new DataTable();
//dataTable.Columns.Add("Id", typeof(string));
//dataTable.Columns.Add("CleanName", typeof(string));
//dataTable.Columns.Add("ProcessedTime", typeof(DateTime));
//foreach (var doc in batch)
//{
// // 业务清洗逻辑
// var cleanName = doc["name"].AsString.Trim().ToUpper();
// dataTable.Rows.Add(
// doc["_id"].ToString(),
// cleanName,
// DateTime.UtcNow);
//}
//await writer.WriteAsync(dataTable);
// 批量更新标记
//var ids = batch.Select(d => d.Id).ToArray();
//foreach (var item in batch)
//{
// item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress;
// item.MigrationTime = DateTime.Now;
//}
//await _meterReadingRecordsRepository.UpdateManyAsync(batch);
}
writer.Complete();
}
/// <summary>
/// 消费清洗后的数据入库
/// </summary>
/// <param name="reader"></param>
/// <returns></returns>
private async Task ConsumeDataAsync(ChannelReader<DataTable> reader)
{
//await using var connection = new SqlConnection(_sqlConnectionString);
//await connection.OpenAsync();
//await foreach (var dataTable in reader.ReadAllAsync())
//{
// using var bulkCopy = new SqlBulkCopy(connection)
// {
// DestinationTableName = "CleanData",
// BatchSize = 5000,
// BulkCopyTimeout = 300
// };
// bulkCopy.ColumnMappings.Add("Id", "Id");
// bulkCopy.ColumnMappings.Add("CleanName", "CleanName");
// bulkCopy.ColumnMappings.Add("ProcessedTime", "ProcessedTime");
// await bulkCopy.WriteToServerAsync(dataTable);
//}
}
}
}