using JiShe.CollectBus.DataMigration.Options; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using LiteDB; using Microsoft.Extensions.Options; using System; using System.Data; using System.Linq; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.DataMigration { /// /// 数据迁移服务 /// public class DataMigrationService: CollectBusAppService, IDataMigrationService { private readonly IRepository _meterReadingRecordsRepository; private readonly DataMigrationOptions _options; public DataMigrationService(IOptions options, IRepository meterReadingRecordsRepository) { _options = options.Value; _meterReadingRecordsRepository = meterReadingRecordsRepository; } /// /// 开始迁移 /// /// public async Task StartMigrationAsync() { var rawDataChannel = Channel.CreateBounded(new BoundedChannelOptions(_options.ChannelCapacity) { SingleWriter = false, SingleReader = false, FullMode = BoundedChannelFullMode.Wait }); var cleanDataChannel = Channel.CreateBounded(new BoundedChannelOptions(_options.ChannelCapacity) { SingleWriter = false, SingleReader = false, FullMode = BoundedChannelFullMode.Wait }); // 启动生产者和消费者 var producer = Task.Run(() => ProduceDataAsync(rawDataChannel.Writer)); var processors = Enumerable.Range(0, _options.ProcessorsCount) .Select(_ => Task.Run(() => ProcessDataAsync(rawDataChannel.Reader, cleanDataChannel.Writer))) .ToArray(); var consumer = Task.Run(() => ConsumeDataAsync(cleanDataChannel.Reader)); await Task.WhenAll(new[] { producer }.Union(processors).Union(new[] { consumer })); } /// /// 生产者,生产数据,主要是从MongoDB中读取数据 /// /// /// private async Task ProduceDataAsync(ChannelWriter writer) { while (true) { var queryable = await _meterReadingRecordsRepository.GetQueryableAsync(); var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted) .Take(_options.MongoDbDataBatchSize) .ToArray(); if (batchRecords == null || batchRecords.Length == 0) { writer.Complete(); break; } await writer.WriteAsync(batchRecords); } } /// /// 清洗数据 /// /// /// /// private async Task ProcessDataAsync(ChannelReader reader, ChannelWriter writer) { await foreach (var batch in reader.ReadAllAsync()) { //var dataTable = new DataTable(); //dataTable.Columns.Add("Id", typeof(string)); //dataTable.Columns.Add("CleanName", typeof(string)); //dataTable.Columns.Add("ProcessedTime", typeof(DateTime)); //foreach (var doc in batch) //{ // // 业务清洗逻辑 // var cleanName = doc["name"].AsString.Trim().ToUpper(); // dataTable.Rows.Add( // doc["_id"].ToString(), // cleanName, // DateTime.UtcNow); //} //await writer.WriteAsync(dataTable); // 批量更新标记 var ids = batch.Select(d => d.Id).ToArray(); foreach (var item in batch) { item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress; item.MigrationTime = DateTime.Now; } await _meterReadingRecordsRepository.UpdateManyAsync(batch); } writer.Complete(); } /// /// 消费清洗后的数据入库 /// /// /// private async Task ConsumeDataAsync(ChannelReader reader) { //await using var connection = new SqlConnection(_sqlConnectionString); //await connection.OpenAsync(); //await foreach (var dataTable in reader.ReadAllAsync()) //{ // using var bulkCopy = new SqlBulkCopy(connection) // { // DestinationTableName = "CleanData", // BatchSize = 5000, // BulkCopyTimeout = 300 // }; // bulkCopy.ColumnMappings.Add("Id", "Id"); // bulkCopy.ColumnMappings.Add("CleanName", "CleanName"); // bulkCopy.ColumnMappings.Add("ProcessedTime", "ProcessedTime"); // await bulkCopy.WriteToServerAsync(dataTable); //} } } }