diff --git a/src/JiShe.CollectBus.Application.Contracts/DataMigration/IDataMigrationService.cs b/src/JiShe.CollectBus.Application.Contracts/DataMigration/IDataMigrationService.cs new file mode 100644 index 0000000..bbfa581 --- /dev/null +++ b/src/JiShe.CollectBus.Application.Contracts/DataMigration/IDataMigrationService.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.DataMigration +{ + /// + /// 数据迁移服务 + /// + public interface IDataMigrationService + { + /// + /// 开始迁移 + /// + /// + Task StartMigrationAsync(); + } +} diff --git a/src/JiShe.CollectBus.Application.Contracts/DataMigration/Options/DataMigrationOptions.cs b/src/JiShe.CollectBus.Application.Contracts/DataMigration/Options/DataMigrationOptions.cs new file mode 100644 index 0000000..7174cad --- /dev/null +++ b/src/JiShe.CollectBus.Application.Contracts/DataMigration/Options/DataMigrationOptions.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.DataMigration.Options +{ + /// + /// 数据迁移配置 + /// + public class DataMigrationOptions + { + /// + /// MongoDb每批处理量 + /// + public int MongoDbDataBatchSize { get; set; } = 1000; + + /// + /// 批量处理通道容量 + /// + public int ChannelCapacity { get; set; } = 100; + + /// + /// 数据库 每批处理量 + /// + public int SqlBulkBatchSize { get; set; } = 1000; + + /// + /// 数据库 每批处理超时时间 + /// + public int SqlBulkTimeout { get; set; } = 60; + + /// + /// 处理器数量 + /// + public int ProcessorsCount { get; set; } = 4; + } +} diff --git a/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs index 4e00864..9309c55 100644 --- a/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs +++ b/src/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -48,7 +49,7 @@ namespace JiShe.CollectBus.Consumers var list = new List(); foreach (var contextItem in context.Message) { - await protocolPlugin.AnalyzeAsync(contextItem.Message); + await protocolPlugin.AnalyzeAsync(contextItem.Message); list.Add(contextItem.Message); } await _messageReceivedEventRepository.InsertManyAsync(list); diff --git a/src/JiShe.CollectBus.Application/DataMigration/DataMigrationService.cs b/src/JiShe.CollectBus.Application/DataMigration/DataMigrationService.cs new file mode 100644 index 0000000..4aa3a7e --- /dev/null +++ b/src/JiShe.CollectBus.Application/DataMigration/DataMigrationService.cs @@ -0,0 +1,153 @@ +using JiShe.CollectBus.DataMigration.Options; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using LiteDB; +using Microsoft.Extensions.Options; +using System; +using System.Data; +using System.Linq; +using System.Threading.Channels; +using System.Threading.Tasks; +using Volo.Abp.Domain.Repositories; + +namespace JiShe.CollectBus.DataMigration +{ + /// + /// 数据迁移服务 + /// + public class DataMigrationService: CollectBusAppService, IDataMigrationService + { + private readonly IRepository _meterReadingRecordsRepository; + private readonly DataMigrationOptions _options; + + + public DataMigrationService(IOptions options, + IRepository meterReadingRecordsRepository) + { + _options = options.Value; + _meterReadingRecordsRepository = meterReadingRecordsRepository; + } + + /// + /// 开始迁移 + /// + /// + public async Task StartMigrationAsync() + { + var rawDataChannel = Channel.CreateBounded(new BoundedChannelOptions(_options.ChannelCapacity) + { + SingleWriter = false, + SingleReader = false, + FullMode = BoundedChannelFullMode.Wait + }); + + var cleanDataChannel = Channel.CreateBounded(new BoundedChannelOptions(_options.ChannelCapacity) + { + SingleWriter = false, + SingleReader = false, + FullMode = BoundedChannelFullMode.Wait + }); + + // 启动生产者和消费者 + var producer = Task.Run(() => ProduceDataAsync(rawDataChannel.Writer)); + + var processors = Enumerable.Range(0, _options.ProcessorsCount) + .Select(_ => Task.Run(() => ProcessDataAsync(rawDataChannel.Reader, cleanDataChannel.Writer))) + .ToArray(); + + var consumer = Task.Run(() => ConsumeDataAsync(cleanDataChannel.Reader)); + + await Task.WhenAll(new[] { producer }.Union(processors).Union(new[] { consumer })); + } + + /// + /// 生产者,生产数据,主要是从MongoDB中读取数据 + /// + /// + /// + private async Task ProduceDataAsync(ChannelWriter writer) + { + while (true) + { + var queryable = await _meterReadingRecordsRepository.GetQueryableAsync(); + var batchRecords = queryable.Where(d => d.MigrationStatus == Common.Enums.RecordsDataMigrationStatusEnum.NotStarted) + .Take(_options.MongoDbDataBatchSize) + .ToArray(); + + if (batchRecords == null || batchRecords.Length == 0) + { + writer.Complete(); + break; + } + + await writer.WriteAsync(batchRecords); + } + } + + /// + /// 清洗数据 + /// + /// + /// + /// + private async Task ProcessDataAsync(ChannelReader reader, ChannelWriter writer) + { + await foreach (var batch in reader.ReadAllAsync()) + { + //var dataTable = new DataTable(); + //dataTable.Columns.Add("Id", typeof(string)); + //dataTable.Columns.Add("CleanName", typeof(string)); + //dataTable.Columns.Add("ProcessedTime", typeof(DateTime)); + + //foreach (var doc in batch) + //{ + // // 业务清洗逻辑 + // var cleanName = doc["name"].AsString.Trim().ToUpper(); + // dataTable.Rows.Add( + // doc["_id"].ToString(), + // cleanName, + // DateTime.UtcNow); + //} + + //await writer.WriteAsync(dataTable); + + // 批量更新标记 + var ids = batch.Select(d => d.Id).ToArray(); + foreach (var item in batch) + { + item.MigrationStatus = Common.Enums.RecordsDataMigrationStatusEnum.InProgress; + item.MigrationTime = DateTime.Now; + } + + await _meterReadingRecordsRepository.UpdateManyAsync(batch); + } + writer.Complete(); + } + + /// + /// 消费清洗后的数据入库 + /// + /// + /// + private async Task ConsumeDataAsync(ChannelReader reader) + { + //await using var connection = new SqlConnection(_sqlConnectionString); + //await connection.OpenAsync(); + + //await foreach (var dataTable in reader.ReadAllAsync()) + //{ + // using var bulkCopy = new SqlBulkCopy(connection) + // { + // DestinationTableName = "CleanData", + // BatchSize = 5000, + // BulkCopyTimeout = 300 + // }; + + // bulkCopy.ColumnMappings.Add("Id", "Id"); + // bulkCopy.ColumnMappings.Add("CleanName", "CleanName"); + // bulkCopy.ColumnMappings.Add("ProcessedTime", "ProcessedTime"); + + // await bulkCopy.WriteToServerAsync(dataTable); + //} + } + } +} diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 2f144de..9c569d1 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -37,21 +37,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading { private readonly ILogger _logger; private readonly ICapPublisher _capBus; - private readonly IRepository _fifteenMinuteReadingRecordRepository; - private readonly IRepository _fiveMinuteReadingRecordRepository; - private readonly IRepository _oneMinuteReadingRecordRepository; + private readonly IRepository _meterReadingRecordsRepository; public BasicScheduledMeterReadingService( ILogger logger, ICapPublisher capBus, - IRepository fifteenMinuteReadingRecordRepository, IRepository fiveMinuteReadingRecordRepository, IRepository oneMinuteReadingRecordRepository) + IRepository meterReadingRecordsRepository) { _capBus = capBus; _logger = logger; - _oneMinuteReadingRecordRepository = oneMinuteReadingRecordRepository; - _fiveMinuteReadingRecordRepository = fiveMinuteReadingRecordRepository; - _fifteenMinuteReadingRecordRepository = fifteenMinuteReadingRecordRepository; + _meterReadingRecordsRepository = meterReadingRecordsRepository; } /// @@ -279,14 +275,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) @@ -307,7 +303,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 @@ -344,14 +340,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) @@ -372,7 +368,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 @@ -411,14 +407,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) @@ -439,7 +435,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 @@ -620,7 +616,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } - Dictionary keyValuePairs = new Dictionary(); + Dictionary keyValuePairs = new Dictionary(); foreach (var tempItem in tempCodes) { @@ -675,7 +671,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading - var meterReadingRecords = new BasicMeterReadingRecords() + var meterReadingRecords = new MeterReadingRecords() { MeterAddress = ammeter.AmmerterAddress, MeterId = ammeter.ID, @@ -784,14 +780,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) @@ -812,7 +808,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _oneMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 @@ -850,14 +846,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) @@ -878,7 +874,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _fiveMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 @@ -915,14 +911,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } - List meterTaskInfosList = new List(); + List meterTaskInfosList = new List(); //将取出的缓存任务数据发送到Kafka消息队列中 foreach (var focusItem in meterTaskInfos) @@ -943,7 +939,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _fifteenMinuteReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordsRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 5a26013..045c9c2 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -25,8 +25,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher capBus, - IRepository fifteenMinuteReadingRecordRepository, IRepository fiveMinuteReadingRecordRepository, IRepository oneMinuteReadingRecordRepository) :base(logger, capBus, fifteenMinuteReadingRecordRepository, fiveMinuteReadingRecordRepository, oneMinuteReadingRecordRepository) + ICapPublisher capBus, IRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository) { } diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 9df8c39..fc05bc7 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -5,8 +5,10 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; @@ -23,6 +25,7 @@ namespace JiShe.CollectBus.Subscribers private readonly IRepository _messageReceivedHeartbeatEventRepository; private readonly IRepository _messageReceivedEventRepository; private readonly IRepository _deviceRepository; + private readonly IRepository _meterReadingRecordsRepository; /// /// Initializes a new instance of the class. @@ -34,12 +37,13 @@ namespace JiShe.CollectBus.Subscribers /// The message received heartbeat event repository. /// The message received event repository. /// The device repository. + /// The device repository. public SubscriberAppService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, IRepository messageReceivedEventRepository, - IRepository deviceRepository) + IRepository deviceRepository, IRepository meterReadingRecordsRepository) { _logger = logger; _tcpService = tcpService; @@ -48,6 +52,7 @@ namespace JiShe.CollectBus.Subscribers _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; _messageReceivedEventRepository = messageReceivedEventRepository; _deviceRepository = deviceRepository; + _meterReadingRecordsRepository = meterReadingRecordsRepository; } [CapSubscribe(ProtocolConst.SubscriberIssuedEventName)] @@ -90,8 +95,9 @@ namespace JiShe.CollectBus.Subscribers } else { - await protocolPlugin.AnalyzeAsync(receivedMessage); - await _messageReceivedEventRepository.InsertAsync(receivedMessage); + //todo 会根据不同的协议进行解析,然后做业务处理 + TB3761FN fN = await protocolPlugin.AnalyzeAsync(receivedMessage); + //await _messageReceivedEventRepository.InsertAsync(receivedMessage); } } diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs deleted file mode 100644 index acc7163..0000000 --- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFifteenMinuteReadingRecords.cs +++ /dev/null @@ -1,18 +0,0 @@ -using JiShe.CollectBus.Common.Enums; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.Domain.Entities; - -namespace JiShe.CollectBus.IotSystems.MeterReadingRecords -{ - /// - /// 15分钟抄读记录表,包含下发报文和回复报文,以及是否迁移 - /// - public class MeterFifteenMinuteReadingRecords : BasicMeterReadingRecords - { - - } -} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs deleted file mode 100644 index eeb69a2..0000000 --- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterFiveMinuteReadingRecords.cs +++ /dev/null @@ -1,18 +0,0 @@ -using JiShe.CollectBus.Common.Enums; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.Domain.Entities; - -namespace JiShe.CollectBus.IotSystems.MeterReadingRecords -{ - /// - /// 5分钟抄读记录表,包含下发报文和回复报文,以及是否迁移 - /// - public class MeterFiveMinuteReadingRecords : BasicMeterReadingRecords - { - - } -} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs deleted file mode 100644 index 35b7630..0000000 --- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterOneMinuteReadingRecords.cs +++ /dev/null @@ -1,18 +0,0 @@ -using JiShe.CollectBus.Common.Enums; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.Domain.Entities; - -namespace JiShe.CollectBus.IotSystems.MeterReadingRecords -{ - /// - /// 1分钟抄读记录表,包含下发报文和回复报文,以及是否迁移 - /// - public class MeterOneMinuteReadingRecords : BasicMeterReadingRecords - { - - } -} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs similarity index 89% rename from src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs rename to src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs index 315bbda..48e4cd8 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/BasicMeterReadingRecords.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingRecords.cs @@ -9,9 +9,9 @@ using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IotSystems.MeterReadingRecords { /// - /// 抄读记录基类 + /// 抄读数据记录 /// - public class BasicMeterReadingRecords : AggregateRoot + public class MeterReadingRecords : AggregateRoot { /// @@ -55,6 +55,16 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// public MeterTypeEnum MeterType { get; set; } + /// + /// 项目ID + /// + public int ProjectID { get; set; } + + /// + /// 数据库业务ID + /// + public int DatabaseBusiID { get; set; } + /// /// AFN功能码 /// diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index ec1bd69..1c50ce4 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -34,7 +34,7 @@ "CorsOrigins": "http://localhost:4200,http://localhost:3100" }, "ConnectionStrings": { - "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin", + "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=200&minPoolSize=10&waitQueueTimeoutMS=5000", "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs index 5c73029..2366553 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs @@ -24,9 +24,8 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon public IMongoCollection Devices => Collection(); public IMongoCollection ProtocolInfos => Collection(); - public IMongoCollection FifteenMinuteReadingRecords => Collection(); - public IMongoCollection MeterFiveMinuteReadingRecords => Collection(); - public IMongoCollection MeterOneMinuteReadingRecords => Collection(); + public IMongoCollection MeterReadingRecords => Collection(); + protected override void CreateModel(IMongoModelBuilder modelBuilder) { diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index b69c541..609dcdb 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts //await _protocolInfoCache.Get() } - public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null); + public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN; /// /// 登录帧解析 diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs index bd08d60..5ad92c1 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs @@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces Task AddAsync(); - Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null); + Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN; Task LoginAsync(MessageReceivedLogin messageReceived); diff --git a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 79a92dd..a28cd2d 100644 --- a/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -8,8 +8,8 @@ using JiShe.CollectBus.Protocol.Contracts.Models; using Newtonsoft.Json.Linq; namespace JiShe.CollectBus.Protocol -{ - public class StandardProtocolPlugin: BaseProtocolPlugin +{ + public class StandardProtocolPlugin : BaseProtocolPlugin { /// /// Initializes a new instance of the class. @@ -21,12 +21,15 @@ namespace JiShe.CollectBus.Protocol public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); - public override Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) + public override async Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) { var hexStringList = messageReceived.MessageHexString.StringToPairs(); var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); var afn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); + + T analyze = default; + switch ((AFN)afn) { case AFN.确认或否认: @@ -35,15 +38,15 @@ namespace JiShe.CollectBus.Protocol case AFN.设置参数: break; case AFN.查询参数: break; case AFN.请求实时数据: - if (Enum.IsDefined(typeof(ATypeOfDataItems), fn)) //Enum.TryParse(afn.ToString(), out ATypeOfDataItems parseResult) + if (Enum.IsDefined(typeof(ATypeOfDataItems), fn)) { - AnalyzeReadingDataAsync(messageReceived, sendAction); + analyze = (T?)AnalyzeReadingDataAsync(messageReceived, sendAction); } break; case AFN.请求历史数据: if (Enum.IsDefined(typeof(IIdataTypeItems), fn)) { - AnalyzeReadingTdcDataAsync(messageReceived, sendAction); + analyze = (T?)AnalyzeReadingTdcDataAsync(messageReceived, sendAction); } break; case AFN.数据转发: @@ -51,7 +54,7 @@ namespace JiShe.CollectBus.Protocol break; } - throw new NotImplementedException(); + return await Task.FromResult(analyze); } #region 上行命令