完成定时阀控
This commit is contained in:
parent
42a446c24d
commit
4de3fb7db2
@ -2,6 +2,7 @@
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
|
using JiShe.CollectBus.IotSystems.Ammeters;
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
|
|
||||||
@ -58,6 +59,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task AmmeterScheduledMeterFifteenMinuteReading();
|
Task AmmeterScheduledMeterFifteenMinuteReading();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取电表阀控配置
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="currentTime">阀控的时间</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 电表自动阀控
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
Task AmmeterScheduledAutoValveControl();
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -20,19 +20,25 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集电表数据下行消息消费订阅
|
/// 1分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据下行消息消费订阅
|
/// 5分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集电表数据下行消息消费订阅
|
/// 15分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 电表自动阀控下行消息消费订阅
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region 水表消息采集
|
#region 水表消息采集
|
||||||
@ -40,7 +46,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集水表数据下行消息消费订阅
|
/// 1分钟采集水表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,4 @@
|
|||||||
using Amazon.Runtime.Internal.Endpoints.StandardLibrary;
|
using JiShe.CollectBus.Application.Contracts;
|
||||||
using Confluent.Kafka;
|
|
||||||
using DeviceDetectorNET.Parser.Device;
|
|
||||||
using DnsClient.Protocol;
|
|
||||||
using FreeSql;
|
|
||||||
using JiShe.CollectBus.Ammeters;
|
|
||||||
using JiShe.CollectBus.Application.Contracts;
|
|
||||||
using JiShe.CollectBus.Common.BuildSendDatas;
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
||||||
@ -13,36 +7,26 @@ using JiShe.CollectBus.Common.Enums;
|
|||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.EnergySystems.Entities;
|
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IoTDB.Context;
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IoTDB.Model;
|
using JiShe.CollectBus.IoTDB.Model;
|
||||||
using JiShe.CollectBus.IoTDB.Options;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using JiShe.CollectBus.IoTDB.Provider;
|
using JiShe.CollectBus.IoTDB.Provider;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.Ammeters;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.RedisDataCache;
|
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
|
||||||
using Mapster;
|
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Diagnostics.Metrics;
|
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using static FreeSql.Internal.GlobalFilter;
|
|
||||||
using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata;
|
|
||||||
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
{
|
{
|
||||||
@ -57,6 +41,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
private readonly IRedisDataCacheService _redisDataCacheService;
|
private readonly IRedisDataCacheService _redisDataCacheService;
|
||||||
private readonly KafkaOptionConfig _kafkaOptions;
|
private readonly KafkaOptionConfig _kafkaOptions;
|
||||||
private readonly IoTDBRuntimeContext _runtimeContext;
|
private readonly IoTDBRuntimeContext _runtimeContext;
|
||||||
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
|
||||||
|
int pageSize = 3000;
|
||||||
|
|
||||||
public BasicScheduledMeterReadingService(
|
public BasicScheduledMeterReadingService(
|
||||||
ILogger<BasicScheduledMeterReadingService> logger,
|
ILogger<BasicScheduledMeterReadingService> logger,
|
||||||
@ -64,6 +51,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
IRedisDataCacheService redisDataCacheService,
|
IRedisDataCacheService redisDataCacheService,
|
||||||
IIoTDbProvider dbProvider,
|
IIoTDbProvider dbProvider,
|
||||||
IoTDBRuntimeContext runtimeContext,
|
IoTDBRuntimeContext runtimeContext,
|
||||||
|
IServiceProvider serviceProvider,
|
||||||
IOptions<KafkaOptionConfig> kafkaOptions)
|
IOptions<KafkaOptionConfig> kafkaOptions)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
@ -72,6 +60,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
_producerService = producerService;
|
_producerService = producerService;
|
||||||
_redisDataCacheService = redisDataCacheService;
|
_redisDataCacheService = redisDataCacheService;
|
||||||
_kafkaOptions = kafkaOptions.Value;
|
_kafkaOptions = kafkaOptions.Value;
|
||||||
|
_serviceProvider = serviceProvider;
|
||||||
|
|
||||||
_runtimeContext.UseTableSessionPool = true;
|
_runtimeContext.UseTableSessionPool = true;
|
||||||
}
|
}
|
||||||
@ -421,11 +410,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = 3000,
|
PageSize = pageSize,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -458,11 +447,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = 3000,
|
PageSize = pageSize,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -494,11 +483,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = 3000,
|
PageSize = pageSize,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -643,7 +632,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
|
_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -689,6 +678,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
return taskList;
|
return taskList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取电表阀控配置
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="currentTime">阀控的时间</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public virtual Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 电表自动阀控
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
public virtual Task AmmeterScheduledAutoValveControl()
|
||||||
|
{
|
||||||
|
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
|
||||||
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
@ -824,11 +832,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = 3000,
|
PageSize = pageSize,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -904,6 +912,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
tempCodes = new List<string>() { "10_1" };
|
tempCodes = new List<string>() { "10_1" };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
if (protocolPlugin == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("协议不存在!");
|
||||||
|
}
|
||||||
|
|
||||||
foreach (var tempItem in tempCodes)
|
foreach (var tempItem in tempCodes)
|
||||||
{
|
{
|
||||||
@ -995,7 +1008,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="nextTaskTime"></param>
|
/// <param name="nextTaskTime"></param>
|
||||||
/// <param name="timeDensity"></param>
|
/// <param name="timeDensity"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
|
protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
|
||||||
{
|
{
|
||||||
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
|
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
|
||||||
{
|
{
|
||||||
@ -1013,7 +1026,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="meterType">表类型</param>
|
/// <param name="meterType">表类型</param>
|
||||||
/// <param name="taskCreateAction">具体的创建任务的委托</param>
|
/// <param name="taskCreateAction">具体的创建任务的委托</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
|
protected async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
var timer = Stopwatch.StartNew();
|
var timer = Stopwatch.StartNew();
|
||||||
|
|
||||||
@ -1074,9 +1087,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 创建Kafka消息
|
/// 创建Kafka消息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
/// <typeparam name="T"></typeparam>
|
||||||
|
/// <param name="kafkaTopicName">kafka主题名称</param>
|
||||||
|
/// <param name="options">任务查询条件</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task CreateMeterKafkaTaskMessage<T>(int timeDensity, IoTDBQueryOptions options) where T : IoTEntity, new()
|
protected async Task CreateMeterKafkaTaskMessage<T>(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new()
|
||||||
{
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(kafkaTopicName))
|
||||||
|
{
|
||||||
|
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空,-101");
|
||||||
|
return;
|
||||||
|
}
|
||||||
int pageNumber = 0;
|
int pageNumber = 0;
|
||||||
bool hasNext;
|
bool hasNext;
|
||||||
var stopwatch = Stopwatch.StartNew();
|
var stopwatch = Stopwatch.StartNew();
|
||||||
@ -1103,7 +1124,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
} while (hasNext);
|
} while (hasNext);
|
||||||
|
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -1113,7 +1134,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="taskRecord">任务记录</param>
|
/// <param name="taskRecord">任务记录</param>
|
||||||
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task KafkaProducerIssuedMessageAction<T>(string topicName,
|
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
|
||||||
T taskRecord, int partition) where T : class
|
T taskRecord, int partition) where T : class
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
||||||
|
|||||||
@ -1,30 +1,30 @@
|
|||||||
using System;
|
using JiShe.CollectBus.Application.Contracts;
|
||||||
using System.Collections.Generic;
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
||||||
using System.Threading.Tasks;
|
|
||||||
using JiShe.CollectBus.Ammeters;
|
|
||||||
using JiShe.CollectBus.Application.Contracts;
|
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
||||||
|
using JiShe.CollectBus.Common.Encrypt;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IoTDB.Context;
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Ammeters;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
using JiShe.CollectBus.Repository;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using Volo.Abp.Domain.Repositories;
|
using System;
|
||||||
using Volo.Abp.Uow;
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
{
|
{
|
||||||
@ -36,6 +36,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
||||||
{
|
{
|
||||||
string serverTagName = string.Empty;
|
string serverTagName = string.Empty;
|
||||||
|
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
||||||
|
private readonly IIoTDbProvider _dbProvider;
|
||||||
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
|
||||||
public EnergySystemScheduledMeterReadingService(
|
public EnergySystemScheduledMeterReadingService(
|
||||||
ILogger<EnergySystemScheduledMeterReadingService> logger,
|
ILogger<EnergySystemScheduledMeterReadingService> logger,
|
||||||
@ -43,15 +46,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
IOptions<KafkaOptionConfig> kafkaOptions,
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
||||||
IoTDBRuntimeContext runtimeContext,
|
IoTDBRuntimeContext runtimeContext,
|
||||||
IProducerService producerService,
|
IProducerService producerService,
|
||||||
|
IServiceProvider serviceProvider,
|
||||||
IRedisDataCacheService redisDataCacheService)
|
IRedisDataCacheService redisDataCacheService)
|
||||||
: base(logger,
|
: base(logger,
|
||||||
producerService,
|
producerService,
|
||||||
redisDataCacheService,
|
redisDataCacheService,
|
||||||
dbProvider,
|
dbProvider,
|
||||||
runtimeContext,
|
runtimeContext,
|
||||||
|
serviceProvider,
|
||||||
kafkaOptions)
|
kafkaOptions)
|
||||||
{
|
{
|
||||||
serverTagName = kafkaOptions.Value.ServerTagName;
|
serverTagName = kafkaOptions.Value.ServerTagName;
|
||||||
|
_dbProvider = dbProvider;
|
||||||
|
_logger = logger;
|
||||||
|
_serviceProvider = serviceProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed override string SystemType => SystemTypeConst.Energy;
|
public sealed override string SystemType => SystemTypeConst.Energy;
|
||||||
@ -138,6 +146,199 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
.QueryAsync<AmmeterInfo>(sql);
|
.QueryAsync<AmmeterInfo>(sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取电表阀控配置
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="currentTime">阀控的时间</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
|
||||||
|
{
|
||||||
|
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
|
||||||
|
FROM TB_AutoTripTask(nolock) AS A
|
||||||
|
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
|
||||||
|
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
|
||||||
|
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
|
||||||
|
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
|
||||||
|
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime";
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(currentTime))
|
||||||
|
{
|
||||||
|
sql = $@"{sql} AND A.TripTime = '{currentTime}'";
|
||||||
|
}
|
||||||
|
|
||||||
|
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
||||||
|
.Ado
|
||||||
|
.QueryAsync<AmmeterAutoValveControlSetting>(sql);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 电表自动阀控
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
public override async Task AmmeterScheduledAutoValveControl()
|
||||||
|
{
|
||||||
|
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
|
|
||||||
|
var currentTime = DateTime.Now;
|
||||||
|
string currentTimeStr = $"{currentTime:HH:mm}";
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
|
||||||
|
|
||||||
|
if (settingInfos == null || settingInfos.Count <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//批量获取对应的缓存电表信息
|
||||||
|
var ammeterInfos = new List<AmmeterInfo>();
|
||||||
|
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
||||||
|
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
||||||
|
|
||||||
|
foreach (var settingInfo in settingInfos)
|
||||||
|
{
|
||||||
|
bool isGenerate = false;
|
||||||
|
switch (settingInfo.LoopType)
|
||||||
|
{
|
||||||
|
case "Once":
|
||||||
|
isGenerate = CommonHelper.JudgeIsGenerate_Once(settingInfo.OnceWithDate.Value, currentTime);
|
||||||
|
break;
|
||||||
|
case "EachDay":
|
||||||
|
isGenerate = CommonHelper.JudgeIsGenerate_Day(settingInfo.EachDayWithout, currentTime);
|
||||||
|
break;
|
||||||
|
case "EachWeek":
|
||||||
|
isGenerate = CommonHelper.JudgeIsGenerate_Week(settingInfo.EachWeekWith, currentTime);
|
||||||
|
break;
|
||||||
|
case "EachMonth":
|
||||||
|
isGenerate = CommonHelper.JudgeIsGenerate_Month(settingInfo.EachMonthWith, currentTime);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isGenerate)//不生成,跳入下一循环
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
//获取对应的缓存电表信息
|
||||||
|
var ammeterInfo = ammeterInfos.First();
|
||||||
|
bool tripStateResult = false;
|
||||||
|
if (settingInfo.TripType.Equals("on"))
|
||||||
|
{
|
||||||
|
ammeterInfo.TripState = 0;
|
||||||
|
tripStateResult = true;
|
||||||
|
}
|
||||||
|
else if (settingInfo.TripType.Equals("off"))
|
||||||
|
{
|
||||||
|
ammeterInfo.TripState = 1;
|
||||||
|
tripStateResult = false;
|
||||||
|
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
if (protocolPlugin == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("协议不存在!");
|
||||||
|
}
|
||||||
|
|
||||||
|
var temCode = "10_98";
|
||||||
|
var itemCodeArr = temCode.Split('_');
|
||||||
|
var aFNStr = itemCodeArr[0];
|
||||||
|
var aFN = (AFN)(aFNStr.HexToDec());
|
||||||
|
var fn = int.Parse(itemCodeArr[1]);
|
||||||
|
TelemetryPacketResponse builderResponse = null;
|
||||||
|
string methonCode = $"AFN{aFNStr}_Fn_Send";
|
||||||
|
//特殊表暂不处理
|
||||||
|
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
|
||||||
|
, out var handler))
|
||||||
|
{
|
||||||
|
builderResponse = handler(new TelemetryPacketRequest()
|
||||||
|
{
|
||||||
|
FocusAddress = ammeterInfo.FocusAddress,
|
||||||
|
Fn = fn,
|
||||||
|
Pn = ammeterInfo.MeteringCode,
|
||||||
|
DataUnit = Build645SendData.BuildAmmeterValveControlSendDataUnit(ammeterInfo.AmmerterAddress, "", ammeterInfo.Password, tripStateResult),//生成阀控报文
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}无效编码,-103");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (builderResponse == null)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}报文构建失败,-104");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA);
|
||||||
|
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
||||||
|
{
|
||||||
|
SystemName = SystemType,
|
||||||
|
ProjectId = $"{ammeterInfo.ProjectID}",
|
||||||
|
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
||||||
|
DeviceId = $"{ammeterInfo.MeterId}",
|
||||||
|
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
||||||
|
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||||
|
PendingCopyReadTime = currentTime,
|
||||||
|
CreationTime = currentTime,
|
||||||
|
MeterAddress = ammeterInfo.AmmerterAddress,
|
||||||
|
AFN = (int)aFN,
|
||||||
|
Fn = fn,
|
||||||
|
//Seq = builderResponse.Seq,
|
||||||
|
MSA = builderResponse.MSA,
|
||||||
|
ItemCode = temCode,
|
||||||
|
TaskMark = taskMark,
|
||||||
|
IsSend = false,
|
||||||
|
ManualOrNot = false,
|
||||||
|
Pn = ammeterInfo.MeteringCode,
|
||||||
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||||
|
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
||||||
|
IsReceived = false,
|
||||||
|
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
||||||
|
};
|
||||||
|
taskList.Add(meterReadingRecords);
|
||||||
|
}
|
||||||
|
if (taskList == null || taskList.Count <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-105");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//任务记录入库
|
||||||
|
_ = _dbProvider.BatchInsertAsync(metadata, taskList);
|
||||||
|
|
||||||
|
//任务信息推送Kafka
|
||||||
|
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
||||||
|
items: taskList,
|
||||||
|
deviceIdSelector: data => data.DeviceId,
|
||||||
|
processor: (data, groupIndex) =>
|
||||||
|
{
|
||||||
|
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, data, groupIndex);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
//todo 阀控记录入库,推送到新的服务
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取水表信息
|
/// 获取水表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -1,58 +1,37 @@
|
|||||||
using System;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using System.Collections.Generic;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using System.Threading.Tasks;
|
|
||||||
using DeviceDetectorNET.Parser.Device;
|
|
||||||
using JiShe.CollectBus.Common.Consts;
|
|
||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
using JiShe.CollectBus.Kafka.Attributes;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
|
||||||
using Microsoft.AspNetCore.Mvc;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
using TouchSocket.Sockets;
|
using TouchSocket.Sockets;
|
||||||
using Volo.Abp.Caching;
|
|
||||||
using Volo.Abp.Domain.Repositories;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Subscribers
|
namespace JiShe.CollectBus.Subscribers
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 定时抄读任务消息消费订阅
|
/// 定时抄读任务消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Route($"/worker/app/subscriber")]
|
|
||||||
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
|
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
|
||||||
{
|
{
|
||||||
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
private readonly IServiceProvider _serviceProvider;
|
private readonly IIoTDbProvider _dbProvider;
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of the <see cref="WorkerSubscriberAppService"/> class.
|
/// Initializes a new instance of the <see cref="WorkerSubscriberAppService"/> class.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger">The logger.</param>
|
/// <param name="logger">The logger.</param>
|
||||||
/// <param name="tcpService">The TCP service.</param>
|
/// <param name="tcpService">The TCP service.</param>
|
||||||
/// <param name="deviceRepository">The Device pepository.</param>
|
/// <param name="dbProvider">IoTDB数据驱动</param>
|
||||||
/// <param name="serviceProvider">The service provider.</param>
|
|
||||||
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
||||||
ITcpService tcpService,
|
ITcpService tcpService,
|
||||||
IRepository<Device, Guid> deviceRepository,
|
IIoTDbProvider dbProvider)
|
||||||
IMeterReadingRecordRepository meterReadingRecordsRepository,
|
|
||||||
IServiceProvider serviceProvider)
|
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
_serviceProvider = serviceProvider;
|
_dbProvider = dbProvider;
|
||||||
_deviceRepository = deviceRepository;
|
|
||||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -63,57 +42,24 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
|
||||||
[Route("ammeter/oneminute/issued-event")]
|
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
return await SendMessagesAsync(receivedMessage);
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据下行消息消费订阅
|
/// 5分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
|
||||||
[Route("ammeter/fiveminute/issued-event")]
|
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
return await SendMessagesAsync(receivedMessage);
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -121,36 +67,23 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
|
||||||
[Route("ammeter/fifteenminute/issued-event")]
|
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
||||||
try
|
return await SendMessagesAsync(receivedMessage);
|
||||||
{
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
|
|
||||||
if (device != null)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
|
||||||
|
|
||||||
}
|
/// <summary>
|
||||||
}
|
/// 电表自动阀控下行消息消费订阅
|
||||||
return SubscribeAck.Success();
|
/// </summary>
|
||||||
}
|
/// <param name="receivedMessage"></param>
|
||||||
catch (Exception ex)
|
/// <returns></returns>
|
||||||
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)]
|
||||||
|
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
{
|
{
|
||||||
|
_logger.LogInformation("电表自动阀控下行消息消费队列开始处理");
|
||||||
throw ex;
|
return await SendMessagesAsync(receivedMessage);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
@ -161,28 +94,41 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
|
||||||
[Route("watermeter/fifteenminute/issued-event")]
|
|
||||||
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
||||||
//[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
return await SendMessagesAsync(receivedMessage);
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
}
|
||||||
if (protocolPlugin == null)
|
#endregion
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备报文发送
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="receivedMessage">消息记录</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
private async Task<ISubscribeAck> SendMessagesAsync(MeterReadingTelemetryPacketInfo receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogError("【15分钟采集水表数据下行消息消费队列开始处理】协议不存在!");
|
try
|
||||||
|
{
|
||||||
|
var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress);
|
||||||
|
if (checkResult)
|
||||||
|
{
|
||||||
|
await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
|
||||||
|
|
||||||
|
receivedMessage.IsSend = true;
|
||||||
|
await _dbProvider.InsertAsync(receivedMessage);
|
||||||
|
|
||||||
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
return SubscribeAck.Fail();
|
||||||
if (device != null)
|
}
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
{
|
{
|
||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
|
||||||
#endregion
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,107 @@
|
|||||||
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.IotSystems.Ammeters
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 电表自动阀控制设置实体
|
||||||
|
/// </summary>
|
||||||
|
public class AmmeterAutoValveControlSetting: DeviceCacheBasicModel
|
||||||
|
{
|
||||||
|
public int ID { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 定时阀控名称
|
||||||
|
/// </summary>
|
||||||
|
public string Name { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 是否禁止: 1 禁止, 0 启用,
|
||||||
|
/// </summary>
|
||||||
|
public int IsForbid { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 阀控命令:on 合闸(通电), off 断闸(断电)
|
||||||
|
/// </summary>
|
||||||
|
public string TripType { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 执行周期:Once 一次、EachDay 每日、EachWeek 每周、EachMonth 每月、
|
||||||
|
/// </summary>
|
||||||
|
public string LoopType { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// (专属字段)执行一次时,指定yyyy-MM-dd 00:00:00 (只用年月日)
|
||||||
|
/// </summary>
|
||||||
|
public DateTime? OnceWithDate { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// (专属字段)每日执行时排除(多个用英文逗号分隔),如:"周六,周日"
|
||||||
|
/// </summary>
|
||||||
|
public string EachDayWithout { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// (专属字段)每周执行时运行(多个用英文逗号分隔),如:"周一,周二,周日"
|
||||||
|
/// </summary>
|
||||||
|
public string EachWeekWith { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// (专属字段)每月执行时运行(多个用英文逗号分隔),如:"1,2,3,5"
|
||||||
|
/// </summary>
|
||||||
|
public string EachMonthWith { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 阀控执行时分:HH:mm:00
|
||||||
|
/// </summary>
|
||||||
|
public string TripTime { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 项目ID
|
||||||
|
/// </summary>
|
||||||
|
public int ProjectID { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 添加时间
|
||||||
|
/// </summary>
|
||||||
|
public DateTime AddDate { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 删除改成-1,
|
||||||
|
/// </summary>
|
||||||
|
public int State { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 实际执行时间
|
||||||
|
/// </summary>
|
||||||
|
public DateTime? WorkTime { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 创建人ID
|
||||||
|
/// </summary>
|
||||||
|
public int CreateUserID { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 采集编号
|
||||||
|
/// </summary>
|
||||||
|
public string GatherCode { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 集中器地址
|
||||||
|
/// </summary>
|
||||||
|
public string Address { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 集中器区域代码
|
||||||
|
/// </summary>
|
||||||
|
public string AreaCode { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 电表通信地址
|
||||||
|
/// </summary>
|
||||||
|
public string AmmerterAddress { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -6,7 +6,7 @@ using System.Linq;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Ammeters
|
namespace JiShe.CollectBus.IotSystems.Ammeters
|
||||||
{
|
{
|
||||||
public class AmmeterInfo: DeviceCacheBasicModel
|
public class AmmeterInfo: DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
@ -15,7 +15,7 @@ using Volo.Abp.Domain.Entities.Auditing;
|
|||||||
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 抄读任务Redis缓存数据记录
|
/// 抄读任务数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[EntityType(EntityTypeEnum.TableModel)]
|
[EntityType(EntityTypeEnum.TableModel)]
|
||||||
public class MeterReadingTelemetryPacketInfo : IoTEntity
|
public class MeterReadingTelemetryPacketInfo : IoTEntity
|
||||||
@ -56,6 +56,13 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
|||||||
[FIELDColumn]
|
[FIELDColumn]
|
||||||
public int FocusId { get; set; }
|
public int FocusId { get; set; }
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 集中器地址
|
||||||
|
/// </summary>
|
||||||
|
[FIELDColumn]
|
||||||
|
public string FocusAddress { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 表Id
|
/// 表Id
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
|
|||||||
/// <param name="ammeterAddress">电表地址</param>
|
/// <param name="ammeterAddress">电表地址</param>
|
||||||
/// <param name="specialControlCode">特殊控制码</param>
|
/// <param name="specialControlCode">特殊控制码</param>
|
||||||
/// <param name="password">密码</param>
|
/// <param name="password">密码</param>
|
||||||
/// <param name="state">是否为开阀</param>
|
/// <param name="state">是否为开阀,true合闸,false关闸</param>
|
||||||
/// <param name="modelCode">型号码</param>
|
/// <param name="modelCode">型号码</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static List<string> BuildAmmeterValveControlSendDataUnit(string ammeterAddress, string specialControlCode, string password, bool state, string modelCode = "")
|
public static List<string> BuildAmmeterValveControlSendDataUnit(string ammeterAddress, string specialControlCode, string password, bool state, string modelCode = "")
|
||||||
|
|||||||
@ -775,5 +775,61 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
|
|
||||||
return makstr;// Convert.ToInt32(makstr) << 32 | msa;
|
return makstr;// Convert.ToInt32(makstr) << 32 | msa;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断是生成月
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="eachMonthWith"></param>
|
||||||
|
/// <param name="curTime"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static bool JudgeIsGenerate_Month(string eachMonthWith, DateTime curTime)
|
||||||
|
{
|
||||||
|
var arr = eachMonthWith.Split(',');
|
||||||
|
if (arr.Contains(curTime.Day.ToString()))
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断是生成一次
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="onceWithDate"></param>
|
||||||
|
/// <param name="curTime"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static bool JudgeIsGenerate_Once(DateTime onceWithDate, DateTime curTime) => curTime.Date.Equals(onceWithDate);//为当天时发送
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断是生成日
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="eachDayWithout"></param>
|
||||||
|
/// <param name="curTime"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static bool JudgeIsGenerate_Day(string eachDayWithout, DateTime curTime)
|
||||||
|
{
|
||||||
|
var weekName = strWeeks[(int)curTime.DayOfWeek];
|
||||||
|
var arr = eachDayWithout.Split(',');
|
||||||
|
return !arr.Contains(weekName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static readonly List<string> strWeeks = new List<string>() { "周日", "周一", "周二", "周三", "周四", "周五", "周六" };
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断是生成周
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="eachWeekWith"></param>
|
||||||
|
/// <param name="curTime"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static bool JudgeIsGenerate_Week(string eachWeekWith, DateTime curTime)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(eachWeekWith))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var weekName = strWeeks[(int)curTime.DayOfWeek];
|
||||||
|
var arr = eachWeekWith.Split(',');
|
||||||
|
return arr.Contains(weekName);
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,7 +4,7 @@ using System.Linq;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.BuildSendDatas
|
namespace JiShe.CollectBus.Common.Models
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 待下发的指令生产任务数据
|
/// 待下发的指令生产任务数据
|
||||||
@ -84,8 +84,8 @@
|
|||||||
"SaslPassword": "lixiao1980",
|
"SaslPassword": "lixiao1980",
|
||||||
"KafkaReplicationFactor": 3,
|
"KafkaReplicationFactor": 3,
|
||||||
"NumPartitions": 30,
|
"NumPartitions": 30,
|
||||||
"ServerTagName": "JiSheCollectBus99",
|
"ServerTagName": "JiSheCollectBus4",
|
||||||
"FirstCollectionTime": "2025-04-22 16:07:00"
|
"FirstCollectionTime": "2025-04-23 11:11:00"
|
||||||
},
|
},
|
||||||
"IoTDBOptions": {
|
"IoTDBOptions": {
|
||||||
"UserName": "root",
|
"UserName": "root",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user