diff --git a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs b/src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs similarity index 52% rename from src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs rename to src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs index 53ad836..cdc731a 100644 --- a/src/JiShe.CollectBus.Application/Consumers/WorkerConsumer.cs +++ b/src/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IotSystems.MessageIssueds; using MassTransit; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; @@ -12,9 +13,9 @@ namespace JiShe.CollectBus.Consumers /// /// 定时抄读任务消费者 /// - public class WorkerConsumer : IConsumer + public class ScheduledMeterReadingConsumer : IConsumer { - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly ITcpService _tcpService; /// @@ -22,7 +23,7 @@ namespace JiShe.CollectBus.Consumers /// /// /// - public WorkerConsumer(ILogger logger, + public ScheduledMeterReadingConsumer(ILogger logger, ITcpService tcpService) { _logger = logger; @@ -30,9 +31,10 @@ namespace JiShe.CollectBus.Consumers } - public async Task Consume(ConsumeContext context) + public async Task Consume(ConsumeContext context) { - await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); + _logger.LogError($"{nameof(ScheduledMeterReadingConsumer)} 集中器的消息消费{context.Message.FocusAddress}"); + await _tcpService.SendAsync(context.Message.FocusAddress, context.Message.MessageHexString); } } } diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index 94c4faf..078728a 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -15,7 +15,8 @@ - + + diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index bb3c743..14e0a6a 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -26,6 +26,7 @@ using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Workers; using MassTransit; using MassTransit.Internals.GraphValidation; +using MassTransit.Transports; using Microsoft.Extensions.Logging; using Volo.Abp.Domain.Repositories; using static FreeSql.Internal.GlobalFilter; @@ -41,10 +42,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly ICapPublisher _capBus; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; - public BasicScheduledMeterReadingService( ILogger logger, - ICapPublisher capBus, + ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) { _capBus = capBus; @@ -432,8 +432,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; + _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + //await _massTransitBus.Publish(tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 24507fe..61cc2e5 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -12,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; +using MassTransit; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.Logging; using Volo.Abp.Domain.Repositories; @@ -27,10 +28,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { - public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher capBus, IMeterReadingRecordRepository _meterReadingRecordsRepository) :base(logger, capBus, _meterReadingRecordsRepository) + public EnergySystemScheduledMeterReadingService(ILogger logger, + ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, capBus, meterReadingRecordsRepository) { - + } public sealed override string SystemType => SystemTypeConst.Energy; @@ -63,21 +64,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading //[Route($"ammeter/list")] public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") { - string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID - FROM TB_GatherInfo(NOLOCK) AS A - INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 - INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 - INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0 - WHERE 1=1 and C.Special = 0 "; - //TODO 记得移除特殊表过滤 - if (!string.IsNullOrWhiteSpace(gatherCode)) + List ammeterInfos = new List(); + ammeterInfos.Add(new AmmeterInfo() { - sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; - } - return await SqlProvider.Instance.Change(DbEnum.EnergyDB) - .Ado - .QueryAsync(sql); + Baudrate = 2400, + FocusAddress = "402440506", + Name = "三相电表", + FocusID = 1, + DatabaseBusiID = 1, + MeteringCode = 2, + AmmerterAddress = "402410040506", + ID = 9980, + TypeName = 3, + }); + ammeterInfos.Add(new AmmeterInfo() + { + Baudrate = 2400, + FocusAddress = "542400504", + Name = "单相电表", + FocusID = 1, + DatabaseBusiID = 1, + MeteringCode = 2, + AmmerterAddress = "542410000504", + ID = 9981, + TypeName = 1, + }); + + return ammeterInfos; + + //string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID + // FROM TB_GatherInfo(NOLOCK) AS A + // INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 + // INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 + // INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0 + // WHERE 1=1 and C.Special = 0 "; + ////TODO 记得移除特殊表过滤 + + //if (!string.IsNullOrWhiteSpace(gatherCode)) + //{ + // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; + //} + //return await SqlProvider.Instance.Change(DbEnum.EnergyDB) + // .Ado + // .QueryAsync(sql); } /// diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 6ffa1d0..374eaa5 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -20,6 +20,7 @@ using JiShe.CollectBus.Plugins; using JiShe.CollectBus.Consumers; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.IotSystems.MessageIssueds; namespace JiShe.CollectBus.Host @@ -302,6 +303,9 @@ namespace JiShe.CollectBus.Host .SetTimeLimitStart(BatchTimeLimitStart.FromLast) .SetConcurrencyLimit(10)); }); + + rider.AddConsumer(); + rider.UsingKafka((c, cfg) => { cfg.Host(configuration.GetConnectionString("Kafka")); @@ -329,6 +333,12 @@ namespace JiShe.CollectBus.Host configurator.ConfigureConsumer(c); configurator.ConfigureConsumeTopology = false; }); + + //cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator => + //{ + // configurator.ConfigureConsumer(c); + // configurator.ConfigureConsumeTopology = false; + //}); }); }); }); diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 933641b..6f24b1e 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Host typeof(AbpAspNetCoreSerilogModule), typeof(AbpSwashbuckleModule), typeof(CollectBusApplicationModule), - typeof(CollectBusMongoDbModule), + typeof(CollectBusMongoDbModule), typeof(AbpCachingStackExchangeRedisModule), typeof(AbpBackgroundWorkersHangfireModule) )]