定时抄读消息存入mangodb

This commit is contained in:
ChenYi 2025-03-17 14:23:48 +08:00
parent 5b177759bd
commit ddb6d52755
6 changed files with 49 additions and 25 deletions

View File

@ -1,5 +1,6 @@
using System.Threading.Tasks; using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using System.Threading.Tasks;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers namespace JiShe.CollectBus.Subscribers

View File

@ -14,6 +14,8 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
@ -21,6 +23,7 @@ using JiShe.CollectBus.Workers;
using MassTransit; using MassTransit;
using MassTransit.Internals.GraphValidation; using MassTransit.Internals.GraphValidation;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -31,10 +34,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _capBus; private readonly ICapPublisher _capBus;
public BasicScheduledMeterReadingService(ILogger<BasicScheduledMeterReadingService> logger, ICapPublisher capBus) private readonly IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> _meterReadingIssuedRepository;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher capBus,
IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository)
{ {
_capBus = capBus; _capBus = capBus;
_logger = logger; _logger = logger;
_meterReadingIssuedRepository = meterReadingIssuedRepository;
} }
/// <summary> /// <summary>
@ -247,7 +257,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
stopwatch.Stop(); stopwatch.Stop();
_logger.LogInformation($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} 15分钟采集电表数据处理完成共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
} }
#endregion #endregion
@ -470,7 +480,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try try
{ {
//将采集器编号的hash值取模分组 //将采集器编号的hash值取模分组
const int TotalShards = 20; const int TotalShards = 1024;
var focusHashGroups = new Dictionary<int, Dictionary<string, Dictionary<string, AmmeterInfo>>>(); var focusHashGroups = new Dictionary<int, Dictionary<string, Dictionary<string, AmmeterInfo>>>();
foreach (var (collectorId, ammetersDictionary) in focusGroup) foreach (var (collectorId, ammetersDictionary) in focusGroup)
@ -504,11 +514,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据分组创建线程批处理集中器 //根据分组创建线程批处理集中器
foreach (var group in focusHashGroups) foreach (var group in focusHashGroups)
{ {
_ = Task.Run(async () => { await CreatePublishTask(eventName, group.Value); }); //TODO _meterReadingIssuedRepository 需要优化
//await CreatePublishTask(eventName, group.Value); //_ = Task.Run(async () => { await CreatePublishTask(eventName, group.Value); });
await CreatePublishTask(eventName, group.Value);
} }
await Task.CompletedTask; //await Task.CompletedTask;
} }
catch (Exception) catch (Exception)
{ {
@ -610,6 +621,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
List<ScheduledMeterReadingIssuedEventMessage> evenMessageInfoList = new List<ScheduledMeterReadingIssuedEventMessage>();
foreach (var tempItem in tempCodes) foreach (var tempItem in tempCodes)
{ {
//排除已发送日冻结和月冻结采集项配置 //排除已发送日冻结和月冻结采集项配置
@ -657,13 +669,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
Message = dataInfos!, Message = dataInfos!,
DeviceNo = ammeter.FocusAddress, DeviceNo = ammeter.FocusAddress,
MessageId = NewId.NextGuid().ToString() MessageId = NewId.NextGuid().ToString(),
TimeDensity = eventName,
WasSuccessful = false,
}; };
await _capBus.PublishAsync(eventName, evenMessageInfo); await _capBus.PublishAsync(eventName, evenMessageInfo);
evenMessageInfoList.Add(evenMessageInfo);
} }
await _meterReadingIssuedRepository.InsertManyAsync(evenMessageInfoList);
} }
} }
} }

View File

@ -1,13 +1,16 @@
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -19,7 +22,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{ {
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, ICapPublisher capBus) :base(logger, capBus) public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, ICapPublisher capBus, IRepository<ScheduledMeterReadingIssuedEventMessage, Guid> meterReadingIssuedRepository) :base(logger, capBus, meterReadingIssuedRepository)
{ {
} }

View File

@ -3,8 +3,9 @@ using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device; using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;

View File

@ -1,17 +1,14 @@
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using System;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.Common.Models namespace JiShe.CollectBus.IotSystems.MessageIssueds
{ {
/// <summary> /// <summary>
/// 定时抄读Kafka消息实体1分钟、5分钟、15分钟 /// 定时抄读Kafka消息实体1分钟、5分钟、15分钟
/// </summary> /// </summary>
public class ScheduledMeterReadingIssuedEventMessage public class ScheduledMeterReadingIssuedEventMessage : AggregateRoot<Guid>
{ {
/// <summary>
/// 消息接收客户端Id
/// </summary>
public string ClientId { get; set; }
/// <summary> /// <summary>
/// 消息内容 /// 消息内容
/// </summary> /// </summary>
@ -22,14 +19,19 @@ namespace JiShe.CollectBus.Common.Models
/// </summary> /// </summary>
public string DeviceNo { get; set; } public string DeviceNo { get; set; }
///// <summary> /// <summary>
///// 采集时间间隔(分钟如15) /// 采集时间间隔通过Kafka主题区分(分钟如15)
///// </summary> /// </summary>
//public int TimeDensity { get; set; } public string TimeDensity { get; set; }
/// <summary> /// <summary>
/// 消息Id /// 消息Id
/// </summary> /// </summary>
public string MessageId { get; set; } public string MessageId { get; set; }
/// <summary>
/// 是否下发成功
/// </summary>
public bool WasSuccessful { get; set; }
} }
} }

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.IotSystems.Protocols;
using MongoDB.Driver; using MongoDB.Driver;
@ -22,6 +23,8 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
public IMongoCollection<Device> Devices => Collection<Device>(); public IMongoCollection<Device> Devices => Collection<Device>();
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>(); public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
public IMongoCollection<ScheduledMeterReadingIssuedEventMessage> MeterReadingIssued => Collection<ScheduledMeterReadingIssuedEventMessage>();
protected override void CreateModel(IMongoModelBuilder modelBuilder) protected override void CreateModel(IMongoModelBuilder modelBuilder)
{ {
base.CreateModel(modelBuilder); base.CreateModel(modelBuilder);