diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index 05c9e6a..a7c4032 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -1,5 +1,6 @@ -using System.Threading.Tasks; -using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IotSystems.MessageIssueds; +using JiShe.CollectBus.IotSystems.MessageReceiveds; +using System.Threading.Tasks; using Volo.Abp.Application.Services; namespace JiShe.CollectBus.Subscribers diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 7f4d6ce..39be20d 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -14,6 +14,8 @@ using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IotSystems.Devices; +using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Protocol.Contracts; @@ -21,6 +23,7 @@ using JiShe.CollectBus.Workers; using MassTransit; using MassTransit.Internals.GraphValidation; using Microsoft.Extensions.Logging; +using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -31,10 +34,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading { private readonly ILogger _logger; private readonly ICapPublisher _capBus; - public BasicScheduledMeterReadingService(ILogger logger, ICapPublisher capBus) + private readonly IRepository _meterReadingIssuedRepository; + + + public BasicScheduledMeterReadingService( + ILogger logger, + ICapPublisher capBus, + IRepository meterReadingIssuedRepository) { _capBus = capBus; _logger = logger; + _meterReadingIssuedRepository = meterReadingIssuedRepository; } /// @@ -247,7 +257,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading stopwatch.Stop(); - _logger.LogInformation($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } #endregion @@ -470,7 +480,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading try { //将采集器编号的hash值取模分组 - const int TotalShards = 20; + const int TotalShards = 1024; var focusHashGroups = new Dictionary>>(); foreach (var (collectorId, ammetersDictionary) in focusGroup) @@ -504,11 +514,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据分组创建线程批处理集中器 foreach (var group in focusHashGroups) { - _ = Task.Run(async () => { await CreatePublishTask(eventName, group.Value); }); - //await CreatePublishTask(eventName, group.Value); + //TODO _meterReadingIssuedRepository 需要优化 + //_ = Task.Run(async () => { await CreatePublishTask(eventName, group.Value); }); + await CreatePublishTask(eventName, group.Value); } - await Task.CompletedTask; + //await Task.CompletedTask; } catch (Exception) { @@ -610,6 +621,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } + List evenMessageInfoList = new List(); foreach (var tempItem in tempCodes) { //排除已发送日冻结和月冻结采集项配置 @@ -657,13 +669,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading { Message = dataInfos!, DeviceNo = ammeter.FocusAddress, - MessageId = NewId.NextGuid().ToString() + MessageId = NewId.NextGuid().ToString(), + TimeDensity = eventName, + WasSuccessful = false, }; await _capBus.PublishAsync(eventName, evenMessageInfo); + evenMessageInfoList.Add(evenMessageInfo); } - - + await _meterReadingIssuedRepository.InsertManyAsync(evenMessageInfoList); } } } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index dd49e74..a95312b 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,13 +1,16 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading.Tasks; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.Watermeter; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.Logging; +using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -19,7 +22,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { - public EnergySystemScheduledMeterReadingService(ILogger logger, ICapPublisher capBus) :base(logger, capBus) + public EnergySystemScheduledMeterReadingService(ILogger logger, ICapPublisher capBus, IRepository meterReadingIssuedRepository) :base(logger, capBus, meterReadingIssuedRepository) { } diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index b1cc03b..8d0d154 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -3,8 +3,9 @@ using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; using DotNetCore.CAP; using JiShe.CollectBus.Common.Enums; -using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; +using JiShe.CollectBus.IotSystems.MessageIssueds; +using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.AspNetCore.Mvc; diff --git a/src/JiShe.CollectBus.Common/Models/ScheduledMeterReadingIssuedEventMessage.cs b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/ScheduledMeterReadingIssuedEventMessage.cs similarity index 56% rename from src/JiShe.CollectBus.Common/Models/ScheduledMeterReadingIssuedEventMessage.cs rename to src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/ScheduledMeterReadingIssuedEventMessage.cs index b62c7db..cdab61a 100644 --- a/src/JiShe.CollectBus.Common/Models/ScheduledMeterReadingIssuedEventMessage.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/ScheduledMeterReadingIssuedEventMessage.cs @@ -1,17 +1,14 @@ using JiShe.CollectBus.Common.Enums; +using System; +using Volo.Abp.Domain.Entities; -namespace JiShe.CollectBus.Common.Models +namespace JiShe.CollectBus.IotSystems.MessageIssueds { /// /// 定时抄读Kafka消息实体,1分钟、5分钟、15分钟 /// - public class ScheduledMeterReadingIssuedEventMessage + public class ScheduledMeterReadingIssuedEventMessage : AggregateRoot { - /// - /// 消息接收客户端Id - /// - public string ClientId { get; set; } - /// /// 消息内容 /// @@ -22,14 +19,19 @@ namespace JiShe.CollectBus.Common.Models /// public string DeviceNo { get; set; } - ///// - ///// 采集时间间隔(分钟,如15) - ///// - //public int TimeDensity { get; set; } + /// + /// 采集时间间隔,通过Kafka主题区分(分钟,如15) + /// + public string TimeDensity { get; set; } /// /// 消息Id /// public string MessageId { get; set; } + + /// + /// 是否下发成功 + /// + public bool WasSuccessful { get; set; } } } diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs index a42a250..06dfd1a 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.IotSystems.Devices; +using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using MongoDB.Driver; @@ -22,6 +23,8 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon public IMongoCollection Devices => Collection(); public IMongoCollection ProtocolInfos => Collection(); + public IMongoCollection MeterReadingIssued => Collection(); + protected override void CreateModel(IMongoModelBuilder modelBuilder) { base.CreateModel(modelBuilder);