From 9da1745573270a2a145c4300f085321c17a327d8 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Mon, 24 Mar 2025 20:54:31 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
...er.cs => ScheduledMeterReadingConsumer.cs} | 12 ++--
.../JiShe.CollectBus.Application.csproj | 3 +-
.../BasicScheduledMeterReadingService.cs | 7 ++-
...nergySystemScheduledMeterReadingService.cs | 62 ++++++++++++++-----
.../CollectBusHostModule.Configure.cs | 10 +++
.../CollectBusHostModule.cs | 2 +-
6 files changed, 71 insertions(+), 25 deletions(-)
rename src/JiShe.CollectBus.Application/Consumers/{WorkerConsumer.cs => ScheduledMeterReadingConsumer.cs} (52%)
diff --git a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs
similarity index 52%
rename from src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs
rename to src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs
index 53ad836..cdc731a 100644
--- a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs
+++ b/src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs
@@ -2,6 +2,7 @@
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.IotSystems.MessageIssueds;
using MassTransit;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
@@ -12,9 +13,9 @@ namespace JiShe.CollectBus.Consumers
///
/// 定时抄读任务消费者
///
- public class WorkerConsumer : IConsumer
+ public class ScheduledMeterReadingConsumer : IConsumer
{
- private readonly ILogger _logger;
+ private readonly ILogger _logger;
private readonly ITcpService _tcpService;
///
@@ -22,7 +23,7 @@ namespace JiShe.CollectBus.Consumers
///
///
///
- public WorkerConsumer(ILogger logger,
+ public ScheduledMeterReadingConsumer(ILogger logger,
ITcpService tcpService)
{
_logger = logger;
@@ -30,9 +31,10 @@ namespace JiShe.CollectBus.Consumers
}
- public async Task Consume(ConsumeContext context)
+ public async Task Consume(ConsumeContext context)
{
- await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
+ _logger.LogError($"{nameof(ScheduledMeterReadingConsumer)} 集中器的消息消费{context.Message.FocusAddress}");
+ await _tcpService.SendAsync(context.Message.FocusAddress, context.Message.MessageHexString);
}
}
}
diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
index 94c4faf..078728a 100644
--- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
+++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
@@ -15,7 +15,8 @@
-
+
+
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index bb3c743..14e0a6a 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -26,6 +26,7 @@ using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Workers;
using MassTransit;
using MassTransit.Internals.GraphValidation;
+using MassTransit.Transports;
using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
using static FreeSql.Internal.GlobalFilter;
@@ -41,10 +42,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly ICapPublisher _capBus;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
-
public BasicScheduledMeterReadingService(
ILogger logger,
- ICapPublisher capBus,
+ ICapPublisher capBus,
IMeterReadingRecordRepository meterReadingRecordsRepository)
{
_capBus = capBus;
@@ -432,8 +432,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
+
_ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ //await _massTransitBus.Publish(tempMsg);
+
meterTaskInfosList.Add(ammerterItem.Value);
}
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 24507fe..61cc2e5 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -12,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
+using MassTransit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
@@ -27,10 +28,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{
- public EnergySystemScheduledMeterReadingService(ILogger logger,
- ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository)
+ public EnergySystemScheduledMeterReadingService(ILogger logger,
+ ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, capBus, meterReadingRecordsRepository)
{
-
+
}
public sealed override string SystemType => SystemTypeConst.Energy;
@@ -63,21 +64,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"ammeter/list")]
public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
- string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
- FROM TB_GatherInfo(NOLOCK) AS A
- INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
- INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
- INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
- WHERE 1=1 and C.Special = 0 ";
- //TODO 记得移除特殊表过滤
- if (!string.IsNullOrWhiteSpace(gatherCode))
+ List ammeterInfos = new List();
+ ammeterInfos.Add(new AmmeterInfo()
{
- sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
- }
- return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
- .Ado
- .QueryAsync(sql);
+ Baudrate = 2400,
+ FocusAddress = "402440506",
+ Name = "三相电表",
+ FocusID = 1,
+ DatabaseBusiID = 1,
+ MeteringCode = 2,
+ AmmerterAddress = "402410040506",
+ ID = 9980,
+ TypeName = 3,
+ });
+ ammeterInfos.Add(new AmmeterInfo()
+ {
+ Baudrate = 2400,
+ FocusAddress = "542400504",
+ Name = "单相电表",
+ FocusID = 1,
+ DatabaseBusiID = 1,
+ MeteringCode = 2,
+ AmmerterAddress = "542410000504",
+ ID = 9981,
+ TypeName = 1,
+ });
+
+ return ammeterInfos;
+
+ //string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
+ // FROM TB_GatherInfo(NOLOCK) AS A
+ // INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
+ // INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
+ // INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
+ // WHERE 1=1 and C.Special = 0 ";
+ ////TODO 记得移除特殊表过滤
+
+ //if (!string.IsNullOrWhiteSpace(gatherCode))
+ //{
+ // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ //}
+ //return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
+ // .Ado
+ // .QueryAsync(sql);
}
///
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index 6ffa1d0..374eaa5 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -20,6 +20,7 @@ using JiShe.CollectBus.Plugins;
using JiShe.CollectBus.Consumers;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
+using JiShe.CollectBus.IotSystems.MessageIssueds;
namespace JiShe.CollectBus.Host
@@ -302,6 +303,9 @@ namespace JiShe.CollectBus.Host
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.SetConcurrencyLimit(10));
});
+
+ rider.AddConsumer();
+
rider.UsingKafka((c, cfg) =>
{
cfg.Host(configuration.GetConnectionString("Kafka"));
@@ -329,6 +333,12 @@ namespace JiShe.CollectBus.Host
configurator.ConfigureConsumer(c);
configurator.ConfigureConsumeTopology = false;
});
+
+ //cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ //{
+ // configurator.ConfigureConsumer(c);
+ // configurator.ConfigureConsumeTopology = false;
+ //});
});
});
});
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
index 933641b..6f24b1e 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Host
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule),
typeof(CollectBusApplicationModule),
- typeof(CollectBusMongoDbModule),
+ typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule),
typeof(AbpBackgroundWorkersHangfireModule)
)]