完善Kafka主题消息消费订阅

This commit is contained in:
ChenYi 2025-05-14 14:40:34 +08:00
parent 9df3f652f9
commit 94ff58dd12
4 changed files with 252 additions and 44 deletions

View File

@ -1,6 +1,8 @@
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -17,36 +19,82 @@ namespace JiShe.CollectBus.Subscribers
#region #region
/// <summary> /// <summary>
/// 1分钟采集电表数据下行消息消费订阅 /// 一分钟定时抄读任务消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage);
/// <summary> /// <summary>
/// 5分钟采集电表数据下行消息消费订阅 /// 5分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage);
/// <summary> /// <summary>
/// 15分钟采集电表数据下行消息消费订阅 /// 15分钟采集电表数据下行消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(List<MeterReadingTelemetryPacketInfo> receivedMessage);
/// <summary> /// <summary>
/// 电表自动阀控下行消息消费订阅 /// 电表自动阀控下行消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> issuedEventMessage); Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> receivedMessage);
/// <summary>
/// 其他采集数据下行消息主题日冻结月冻结、集中器版本号、SIM卡号、定时校时等下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledOther(List<MeterReadingTelemetryPacketInfo> receivedMessage);
/// <summary>
/// 电表手动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage);
/// <summary>
/// 电表手动抄读下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage);
#endregion #endregion
#region #region
/// <summary> /// <summary>
/// 1分钟采集水表数据下行消息消费订阅 /// 水表数据下行消息消费订阅
/// </summary> /// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); Task<ISubscribeAck> WatermeterScheduledAutoReading(MeterReadingTelemetryPacketInfo receivedMessage);
/// <summary>
/// 水表自动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
Task<ISubscribeAck> WatermeterScheduleAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage);
/// <summary>
/// 水表手动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
Task<ISubscribeAck> WatermeterScheduleManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage);
/// <summary>
/// 水表手动抄读下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
Task<ISubscribeAck> WatermeterScheduleManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage);
#endregion #endregion
} }

View File

@ -28,6 +28,7 @@ using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Guids;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -44,6 +45,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly DataMigrationOptions _dataMigrationOptions; private readonly DataMigrationOptions _dataMigrationOptions;
private readonly KafkaOptionConfig _kafkaOptions; private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions; private readonly ServerApplicationOptions _applicationOptions;
private readonly IGuidGenerator _guidGenerator;
int pageSize = 10000; int pageSize = 10000;
@ -53,6 +55,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IRedisDataCacheService redisDataCacheService, IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IProtocolService protocolService, IProtocolService protocolService,
IGuidGenerator guidGenerator,
IOptions<DataMigrationOptions> dataMigrationOptions, IOptions<DataMigrationOptions> dataMigrationOptions,
IOptions<KafkaOptionConfig> kafkaOptions, IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions) IOptions<ServerApplicationOptions> applicationOptions)
@ -67,6 +70,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value; _applicationOptions = applicationOptions.Value;
_guidGenerator = guidGenerator;
} }
/// <summary> /// <summary>
@ -323,7 +328,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
return; //return;
try try
{ {
@ -750,7 +755,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
subItemCode: null, subItemCode: null,
pendingCopyReadTime: timestamps, pendingCopyReadTime: timestamps,
creationTime: currentTime, creationTime: currentTime,
packetType: (TelemetryPacketTypeEnum)timeDensity); packetType: (TelemetryPacketTypeEnum)timeDensity,
_guidGenerator);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
} }
@ -832,7 +838,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
subItemCode: subItemCode, subItemCode: subItemCode,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterAutomaticVerificationTime); packetType: TelemetryPacketTypeEnum.AmmeterAutomaticVerificationTime,
_guidGenerator);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
@ -901,7 +908,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
subItemCode: null, subItemCode: null,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterDayFreeze); packetType: TelemetryPacketTypeEnum.AmmeterDayFreeze,
_guidGenerator);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
} }
@ -971,7 +979,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
subItemCode: null, subItemCode: null,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterMonthFreeze); packetType: TelemetryPacketTypeEnum.AmmeterMonthFreeze,
_guidGenerator);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
} }
@ -1370,7 +1379,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
subItemCode: null, subItemCode: null,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TerminalVersion); packetType: TelemetryPacketTypeEnum.TerminalVersion,
_guidGenerator);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
@ -1438,7 +1448,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
subItemCode: null, subItemCode: null,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TelematicsModule); packetType: TelemetryPacketTypeEnum.TelematicsModule,
_guidGenerator);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
@ -1642,8 +1653,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="pendingCopyReadTime">待采集时间,定时采集频率才是特殊情况,其他默认当前时间戳</param> /// <param name="pendingCopyReadTime">待采集时间,定时采集频率才是特殊情况,其他默认当前时间戳</param>
/// <param name="creationTime">数据创建时间戳</param> /// <param name="creationTime">数据创建时间戳</param>
/// <param name="packetType">数据包类型</param> /// <param name="packetType">数据包类型</param>
/// <param name="guidGenerator">Guid生成器</param>
/// <returns></returns> /// <returns></returns>
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType, IGuidGenerator guidGenerator)
{ {
try try
{ {
@ -1672,7 +1684,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IsSend = false, IsSend = false,
ManualOrNot = false, ManualOrNot = false,
Pn = ammeterInfo.MeteringCode, Pn = ammeterInfo.MeteringCode,
IssuedMessageId = Guid.NewGuid().ToString(), IssuedMessageId = guidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false, IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),

View File

@ -22,10 +22,12 @@ using JiShe.CollectBus.Protocol.Models;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MongoDB.Bson.Serialization.IdGenerators;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Guids;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
@ -41,6 +43,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string serverTagName = string.Empty; string serverTagName = string.Empty;
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IProtocolService _protocolService; private readonly IProtocolService _protocolService;
private readonly IGuidGenerator _guidGenerator;
public EnergySystemScheduledMeterReadingService( public EnergySystemScheduledMeterReadingService(
ILogger<EnergySystemScheduledMeterReadingService> logger, ILogger<EnergySystemScheduledMeterReadingService> logger,
@ -48,6 +51,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IRedisDataCacheService redisDataCacheService, IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IProtocolService protocolService, IProtocolService protocolService,
IGuidGenerator guidGenerator,
IOptions<DataMigrationOptions> dataMigrationOptions, IOptions<DataMigrationOptions> dataMigrationOptions,
IOptions<KafkaOptionConfig> kafkaOptions, IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions) IOptions<ServerApplicationOptions> applicationOptions)
@ -56,6 +60,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
redisDataCacheService, redisDataCacheService,
dbProvider, dbProvider,
protocolService, protocolService,
guidGenerator,
dataMigrationOptions, dataMigrationOptions,
kafkaOptions, kafkaOptions,
applicationOptions) applicationOptions)
@ -64,6 +69,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
systemType = applicationOptions.Value.SystemType; systemType = applicationOptions.Value.SystemType;
_logger = logger; _logger = logger;
_protocolService = protocolService; _protocolService = protocolService;
_guidGenerator = guidGenerator;
} }
public sealed override string SystemType => systemType; public sealed override string SystemType => systemType;
@ -108,15 +114,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Baudrate = 2400, Baudrate = 2400,
FocusAddress = "442400040", FocusAddress = "442400040",
Name = "保利单箱电表1", Name = "保利单箱电表1",
FocusId = 95778, FocusId = 95780,
DatabaseBusiID = 1, DatabaseBusiID = 1,
MeteringCode = 1, MeteringCode = 1,
MeterAddress = "442405000040", MeterAddress = "442405000040",
MeterId = 127136, MeterId = 127035,
TypeName = 3, TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15, TimeDensity = 15,
BrandType = "DDS1980", BrandType = "DDS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
}); });
ammeterInfos.Add(new DeviceInfo() ammeterInfos.Add(new DeviceInfo()
@ -124,15 +132,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Baudrate = 2400, Baudrate = 2400,
FocusAddress = "442400039", FocusAddress = "442400039",
Name = "保利单箱电表2", Name = "保利单箱电表2",
FocusId = 95778, FocusId = 69280,
DatabaseBusiID = 1, DatabaseBusiID = 1,
MeteringCode = 1, MeteringCode = 1,
MeterAddress = "442405000039", MeterAddress = "442405000039",
MeterId = 127236, MeterId = 95594,
TypeName = 3, TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15, TimeDensity = 15,
BrandType = "DDS1980", BrandType = "DDS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
}); });
//ammeterInfos.Add(new DeviceInfo() //ammeterInfos.Add(new DeviceInfo()
@ -266,17 +276,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading
try try
{ {
#if DEBUG
var settingInfos = new List<AmmeterAutoValveControlSetting>();
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000040",
FocusAddress = "442400040",
FocusId = 95780,
ProjectID = 1,
TripType = "off",
TripTime = "14:02",
MeterId = 127035,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
#else
//获取电表阀控配置 //获取电表阀控配置
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr); var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
if (settingInfos == null || settingInfos.Count <= 0) if (settingInfos == null || settingInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
return null; return null;
} }
#endif
//批量获取对应的缓存电表信息
var ammeterInfos = new List<DeviceInfo>(); //设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
Dictionary<string, List<DeviceInfo>> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll<List<DeviceInfo>>(redisCacheDeviceInfoHashKey);
List<DeviceInfo> meterInfos = new List<DeviceInfo>();
List<string> focusAddressDataLista = new List<string>();
foreach (var item in keyValuePairsTemps)
{
foreach (var subItem in item.Value)
{
if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
{
meterInfos.Add(subItem);
focusAddressDataLista.Add(subItem.MeterId.ToString());
}
}
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var settingInfo in settingInfos) foreach (var settingInfo in settingInfos)
@ -304,7 +347,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//获取对应的缓存电表信息 //获取对应的缓存电表信息
var ammeterInfo = ammeterInfos.First(); var ammeterInfo = meterInfos.Where(d=>d.MeterId == settingInfo.MeterId).FirstOrDefault();
if (ammeterInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时未找到对应电表信息电表Id={settingInfo.MeterId}, -102");
continue;
}
bool tripStateResult = false; bool tripStateResult = false;
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H; string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = string.Empty; string subItemCode = string.Empty;
@ -361,7 +410,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
subItemCode: subItemCode, subItemCode: subItemCode,
pendingCopyReadTime: currentTime, pendingCopyReadTime: currentTime,
creationTime: currentTime, creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterAutoValveControl); packetType: TelemetryPacketTypeEnum.AmmeterAutoValveControl,
_guidGenerator);
taskList.Add(meterReadingRecords); taskList.Add(meterReadingRecords);
} }
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
@ -8,6 +9,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using TouchSocket.Sockets; using TouchSocket.Sockets;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.Subscribers namespace JiShe.CollectBus.Subscribers
{ {
@ -46,7 +48,7 @@ namespace JiShe.CollectBus.Subscribers
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{ {
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); _logger.LogError($"1分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}");
return await SendMessagesAsync(receivedMessage); return await SendMessagesAsync(receivedMessage);
} }
@ -56,10 +58,10 @@ namespace JiShe.CollectBus.Subscribers
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, EnableBatch = true, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{ {
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); _logger.LogError($"5分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}");
return await SendMessagesAsync(receivedMessage); return await SendMessagesAsync(receivedMessage);
} }
@ -68,11 +70,16 @@ namespace JiShe.CollectBus.Subscribers
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
//[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, EnableBatch = true, TaskCount = 30, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(List<MeterReadingTelemetryPacketInfo> receivedMessage)
{ {
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
return await SendMessagesAsync(receivedMessage); foreach (var item in receivedMessage)
{
_logger.LogError($"15分钟采集电表数据下行消息消费队列开始处理:{item.Serialize()}");
await SendMessagesAsync(item);
}
return SubscribeAck.Success();
} }
/// <summary> /// <summary>
@ -80,14 +87,60 @@ namespace JiShe.CollectBus.Subscribers
/// </summary> /// </summary>
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName,EnableBatch =true,TaskCount=30,BatchSize =500)] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, EnableBatch = true, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> receivedMessage) public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> receivedMessage)
{ {
//todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。 //todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。
//_logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); foreach (var item in receivedMessage)
_logger.LogWarning($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"); {
_logger.LogError($"电表自动阀控下行消息消费队列开始处理:{item.Serialize()}");
await SendMessagesAsync(item);
}
return SubscribeAck.Success();
}
/// <summary>
/// 其他采集数据下行消息主题日冻结月冻结、集中器版本号、SIM卡号、定时校时等下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, EnableBatch = true, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledOther(List<MeterReadingTelemetryPacketInfo> receivedMessage)
{
foreach (var item in receivedMessage)
{
_logger.LogError($"其他采集数据下行消息消费队列开始处理:{item.Serialize()}");
await SendMessagesAsync(item);
}
return SubscribeAck.Success();
}
/// <summary>
/// 电表手动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveControlIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"电表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 电表手动抄读下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"电表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success(); return SubscribeAck.Success();
//return await SendMessagesAsync(receivedMessage);
} }
/// <summary> /// <summary>
@ -132,9 +185,54 @@ namespace JiShe.CollectBus.Subscribers
/// <param name="receivedMessage"></param> /// <param name="receivedMessage"></param>
/// <returns></returns> /// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) public async Task<ISubscribeAck> WatermeterScheduledAutoReading(MeterReadingTelemetryPacketInfo receivedMessage)
{ {
return await SendMessagesAsync(receivedMessage); _logger.LogError($"水表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 水表自动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoValveControlIssuedEventName)]
public async Task<ISubscribeAck> WatermeterScheduleAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"水表自动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 水表手动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveControlIssuedEventName)]
public async Task<ISubscribeAck> WatermeterScheduleManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"水表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 水表手动抄读下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveReadingIssuedEventName)]
public async Task<ISubscribeAck> WatermeterScheduleManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"水表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
} }
#endregion #endregion