优化代码

This commit is contained in:
ChenYi 2025-03-24 20:54:31 +08:00
parent a0fa1ccf97
commit 9da1745573
6 changed files with 71 additions and 25 deletions

View File

@ -2,6 +2,7 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using MassTransit; using MassTransit;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using TouchSocket.Sockets; using TouchSocket.Sockets;
@ -12,9 +13,9 @@ namespace JiShe.CollectBus.Consumers
/// <summary> /// <summary>
/// 定时抄读任务消费者 /// 定时抄读任务消费者
/// </summary> /// </summary>
public class WorkerConsumer : IConsumer<IssuedEventMessage> public class ScheduledMeterReadingConsumer : IConsumer<ScheduledMeterReadingIssuedEventMessage>
{ {
private readonly ILogger<WorkerConsumer> _logger; private readonly ILogger<ScheduledMeterReadingConsumer> _logger;
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
/// <summary> /// <summary>
@ -22,7 +23,7 @@ namespace JiShe.CollectBus.Consumers
/// </summary> /// </summary>
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="tcpService"></param> /// <param name="tcpService"></param>
public WorkerConsumer(ILogger<WorkerConsumer> logger, public ScheduledMeterReadingConsumer(ILogger<ScheduledMeterReadingConsumer> logger,
ITcpService tcpService) ITcpService tcpService)
{ {
_logger = logger; _logger = logger;
@ -30,9 +31,10 @@ namespace JiShe.CollectBus.Consumers
} }
public async Task Consume(ConsumeContext<IssuedEventMessage> context) public async Task Consume(ConsumeContext<ScheduledMeterReadingIssuedEventMessage> context)
{ {
await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); _logger.LogError($"{nameof(ScheduledMeterReadingConsumer)} 集中器的消息消费{context.Message.FocusAddress}");
await _tcpService.SendAsync(context.Message.FocusAddress, context.Message.MessageHexString);
} }
} }
} }

View File

@ -15,6 +15,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="MassTransit.Kafka" Version="8.3.2" />
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AutoMapper" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AutoMapper" Version="8.3.3" />

View File

@ -26,6 +26,7 @@ using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Workers; using JiShe.CollectBus.Workers;
using MassTransit; using MassTransit;
using MassTransit.Internals.GraphValidation; using MassTransit.Internals.GraphValidation;
using MassTransit.Transports;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
using static FreeSql.Internal.GlobalFilter; using static FreeSql.Internal.GlobalFilter;
@ -41,7 +42,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly ICapPublisher _capBus; private readonly ICapPublisher _capBus;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger, ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher capBus, ICapPublisher capBus,
@ -432,8 +432,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress, FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(), TimeDensity = timeDensity.ToString(),
}; };
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//await _massTransitBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value); meterTaskInfosList.Add(ammerterItem.Value);
} }
} }

View File

@ -12,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using MassTransit;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
@ -28,7 +29,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository) ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, capBus, meterReadingRecordsRepository)
{ {
} }
@ -63,21 +64,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"ammeter/list")] //[Route($"ammeter/list")]
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{ {
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
if (!string.IsNullOrWhiteSpace(gatherCode)) List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
ammeterInfos.Add(new AmmeterInfo()
{ {
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; Baudrate = 2400,
} FocusAddress = "402440506",
return await SqlProvider.Instance.Change(DbEnum.EnergyDB) Name = "三相电表",
.Ado FocusID = 1,
.QueryAsync<AmmeterInfo>(sql); DatabaseBusiID = 1,
MeteringCode = 2,
AmmerterAddress = "402410040506",
ID = 9980,
TypeName = 3,
});
ammeterInfos.Add(new AmmeterInfo()
{
Baudrate = 2400,
FocusAddress = "542400504",
Name = "单相电表",
FocusID = 1,
DatabaseBusiID = 1,
MeteringCode = 2,
AmmerterAddress = "542410000504",
ID = 9981,
TypeName = 1,
});
return ammeterInfos;
//string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
// FROM TB_GatherInfo(NOLOCK) AS A
// INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
// INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
// INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
// WHERE 1=1 and C.Special = 0 ";
////TODO 记得移除特殊表过滤
//if (!string.IsNullOrWhiteSpace(gatherCode))
//{
// sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
//}
//return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
// .Ado
// .QueryAsync<AmmeterInfo>(sql);
} }
/// <summary> /// <summary>

View File

@ -20,6 +20,7 @@ using JiShe.CollectBus.Plugins;
using JiShe.CollectBus.Consumers; using JiShe.CollectBus.Consumers;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MessageIssueds;
namespace JiShe.CollectBus.Host namespace JiShe.CollectBus.Host
@ -302,6 +303,9 @@ namespace JiShe.CollectBus.Host
.SetTimeLimitStart(BatchTimeLimitStart.FromLast) .SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.SetConcurrencyLimit(10)); .SetConcurrencyLimit(10));
}); });
rider.AddConsumer<ScheduledMeterReadingConsumer>();
rider.UsingKafka((c, cfg) => rider.UsingKafka((c, cfg) =>
{ {
cfg.Host(configuration.GetConnectionString("Kafka")); cfg.Host(configuration.GetConnectionString("Kafka"));
@ -329,6 +333,12 @@ namespace JiShe.CollectBus.Host
configurator.ConfigureConsumer<IssuedConsumer>(c); configurator.ConfigureConsumer<IssuedConsumer>(c);
configurator.ConfigureConsumeTopology = false; configurator.ConfigureConsumeTopology = false;
}); });
//cfg.TopicEndpoint<ScheduledMeterReadingIssuedEventMessage>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
//{
// configurator.ConfigureConsumer<ScheduledMeterReadingConsumer>(c);
// configurator.ConfigureConsumeTopology = false;
//});
}); });
}); });
}); });