Compare commits
No commits in common. "41d71a5fba38451db7c95e316575ea172cbaa7df" and "fff1ba1a7b6f65851e3151437c9d87c6d6c5570b" have entirely different histories.
41d71a5fba
...
fff1ba1a7b
@ -2,7 +2,6 @@
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IotSystems.Ammeters;
|
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
|
|
||||||
@ -59,19 +58,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task AmmeterScheduledMeterFifteenMinuteReading();
|
Task AmmeterScheduledMeterFifteenMinuteReading();
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取电表阀控配置
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="currentTime">阀控的时间</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 电表自动阀控
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task AmmeterScheduledAutoValveControl();
|
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -20,25 +20,19 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集电表数据下行消息消费订阅
|
/// 1分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据下行消息消费订阅
|
/// 5分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集电表数据下行消息消费订阅
|
/// 15分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 电表自动阀控下行消息消费订阅
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region 水表消息采集
|
#region 水表消息采集
|
||||||
@ -46,7 +40,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集水表数据下行消息消费订阅
|
/// 1分钟采集水表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
|
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,10 @@
|
|||||||
using JiShe.CollectBus.Application.Contracts;
|
using Amazon.Runtime.Internal.Endpoints.StandardLibrary;
|
||||||
|
using Confluent.Kafka;
|
||||||
|
using DeviceDetectorNET.Parser.Device;
|
||||||
|
using DnsClient.Protocol;
|
||||||
|
using FreeSql;
|
||||||
|
using JiShe.CollectBus.Ammeters;
|
||||||
|
using JiShe.CollectBus.Application.Contracts;
|
||||||
using JiShe.CollectBus.Common.BuildSendDatas;
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
||||||
@ -7,26 +13,36 @@ using JiShe.CollectBus.Common.Enums;
|
|||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.EnergySystems.Entities;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IoTDB.Context;
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IoTDB.Model;
|
using JiShe.CollectBus.IoTDB.Model;
|
||||||
using JiShe.CollectBus.IoTDB.Options;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using JiShe.CollectBus.IoTDB.Provider;
|
using JiShe.CollectBus.IoTDB.Provider;
|
||||||
using JiShe.CollectBus.IotSystems.Ammeters;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
|
using JiShe.CollectBus.RedisDataCache;
|
||||||
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
|
using Mapster;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
|
using System.Diagnostics.Metrics;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using static FreeSql.Internal.GlobalFilter;
|
||||||
|
using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata;
|
||||||
|
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
{
|
{
|
||||||
@ -41,9 +57,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
private readonly IRedisDataCacheService _redisDataCacheService;
|
private readonly IRedisDataCacheService _redisDataCacheService;
|
||||||
private readonly KafkaOptionConfig _kafkaOptions;
|
private readonly KafkaOptionConfig _kafkaOptions;
|
||||||
private readonly IoTDBRuntimeContext _runtimeContext;
|
private readonly IoTDBRuntimeContext _runtimeContext;
|
||||||
private readonly IServiceProvider _serviceProvider;
|
|
||||||
|
|
||||||
int pageSize = 3000;
|
|
||||||
|
|
||||||
public BasicScheduledMeterReadingService(
|
public BasicScheduledMeterReadingService(
|
||||||
ILogger<BasicScheduledMeterReadingService> logger,
|
ILogger<BasicScheduledMeterReadingService> logger,
|
||||||
@ -51,7 +64,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
IRedisDataCacheService redisDataCacheService,
|
IRedisDataCacheService redisDataCacheService,
|
||||||
IIoTDbProvider dbProvider,
|
IIoTDbProvider dbProvider,
|
||||||
IoTDBRuntimeContext runtimeContext,
|
IoTDBRuntimeContext runtimeContext,
|
||||||
IServiceProvider serviceProvider,
|
|
||||||
IOptions<KafkaOptionConfig> kafkaOptions)
|
IOptions<KafkaOptionConfig> kafkaOptions)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
@ -60,7 +72,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
_producerService = producerService;
|
_producerService = producerService;
|
||||||
_redisDataCacheService = redisDataCacheService;
|
_redisDataCacheService = redisDataCacheService;
|
||||||
_kafkaOptions = kafkaOptions.Value;
|
_kafkaOptions = kafkaOptions.Value;
|
||||||
_serviceProvider = serviceProvider;
|
|
||||||
|
|
||||||
_runtimeContext.UseTableSessionPool = true;
|
_runtimeContext.UseTableSessionPool = true;
|
||||||
}
|
}
|
||||||
@ -410,11 +421,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = pageSize,
|
PageSize = 3000,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -447,11 +458,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = pageSize,
|
PageSize = 3000,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -483,11 +494,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = pageSize,
|
PageSize = 3000,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -632,7 +643,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
|
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -678,25 +689,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
return taskList;
|
return taskList;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取电表阀控配置
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="currentTime">阀控的时间</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public virtual Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
|
|
||||||
{
|
|
||||||
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 电表自动阀控
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public virtual Task AmmeterScheduledAutoValveControl()
|
|
||||||
{
|
|
||||||
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
|
|
||||||
}
|
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
@ -832,11 +824,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
Value = pendingCopyReadTime
|
Value = pendingCopyReadTime
|
||||||
});
|
});
|
||||||
|
|
||||||
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
|
||||||
{
|
{
|
||||||
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
||||||
PageIndex = 1,
|
PageIndex = 1,
|
||||||
PageSize = pageSize,
|
PageSize = 3000,
|
||||||
Conditions = conditions,
|
Conditions = conditions,
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -912,11 +904,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
tempCodes = new List<string>() { "10_1" };
|
tempCodes = new List<string>() { "10_1" };
|
||||||
}
|
}
|
||||||
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("协议不存在!");
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (var tempItem in tempCodes)
|
foreach (var tempItem in tempCodes)
|
||||||
{
|
{
|
||||||
@ -1008,7 +995,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="nextTaskTime"></param>
|
/// <param name="nextTaskTime"></param>
|
||||||
/// <param name="timeDensity"></param>
|
/// <param name="timeDensity"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
|
private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
|
||||||
{
|
{
|
||||||
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
|
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
|
||||||
{
|
{
|
||||||
@ -1026,7 +1013,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="meterType">表类型</param>
|
/// <param name="meterType">表类型</param>
|
||||||
/// <param name="taskCreateAction">具体的创建任务的委托</param>
|
/// <param name="taskCreateAction">具体的创建任务的委托</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
|
private async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
var timer = Stopwatch.StartNew();
|
var timer = Stopwatch.StartNew();
|
||||||
|
|
||||||
@ -1087,17 +1074,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 创建Kafka消息
|
/// 创建Kafka消息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="T"></typeparam>
|
|
||||||
/// <param name="kafkaTopicName">kafka主题名称</param>
|
|
||||||
/// <param name="options">任务查询条件</param>
|
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected async Task CreateMeterKafkaTaskMessage<T>(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new()
|
private async Task CreateMeterKafkaTaskMessage<T>(int timeDensity, IoTDBQueryOptions options) where T : IoTEntity, new()
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(kafkaTopicName))
|
|
||||||
{
|
|
||||||
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空,-101");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
int pageNumber = 0;
|
int pageNumber = 0;
|
||||||
bool hasNext;
|
bool hasNext;
|
||||||
var stopwatch = Stopwatch.StartNew();
|
var stopwatch = Stopwatch.StartNew();
|
||||||
@ -1124,7 +1103,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
} while (hasNext);
|
} while (hasNext);
|
||||||
|
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -1134,7 +1113,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <param name="taskRecord">任务记录</param>
|
/// <param name="taskRecord">任务记录</param>
|
||||||
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
|
private async Task KafkaProducerIssuedMessageAction<T>(string topicName,
|
||||||
T taskRecord, int partition) where T : class
|
T taskRecord, int partition) where T : class
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
||||||
|
|||||||
@ -1,30 +1,30 @@
|
|||||||
using JiShe.CollectBus.Application.Contracts;
|
using System;
|
||||||
using JiShe.CollectBus.Common.BuildSendDatas;
|
using System.Collections.Generic;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using JiShe.CollectBus.Ammeters;
|
||||||
|
using JiShe.CollectBus.Application.Contracts;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
||||||
using JiShe.CollectBus.Common.Encrypt;
|
|
||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IoTDB.Context;
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IotSystems.Ammeters;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Repository;
|
||||||
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System;
|
using Volo.Abp.Domain.Repositories;
|
||||||
using System.Collections.Generic;
|
using Volo.Abp.Uow;
|
||||||
using System.Linq;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
{
|
{
|
||||||
@ -36,9 +36,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
||||||
{
|
{
|
||||||
string serverTagName = string.Empty;
|
string serverTagName = string.Empty;
|
||||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
|
||||||
private readonly IIoTDbProvider _dbProvider;
|
|
||||||
private readonly IServiceProvider _serviceProvider;
|
|
||||||
|
|
||||||
public EnergySystemScheduledMeterReadingService(
|
public EnergySystemScheduledMeterReadingService(
|
||||||
ILogger<EnergySystemScheduledMeterReadingService> logger,
|
ILogger<EnergySystemScheduledMeterReadingService> logger,
|
||||||
@ -46,20 +43,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
IOptions<KafkaOptionConfig> kafkaOptions,
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
||||||
IoTDBRuntimeContext runtimeContext,
|
IoTDBRuntimeContext runtimeContext,
|
||||||
IProducerService producerService,
|
IProducerService producerService,
|
||||||
IServiceProvider serviceProvider,
|
|
||||||
IRedisDataCacheService redisDataCacheService)
|
IRedisDataCacheService redisDataCacheService)
|
||||||
: base(logger,
|
: base(logger,
|
||||||
producerService,
|
producerService,
|
||||||
redisDataCacheService,
|
redisDataCacheService,
|
||||||
dbProvider,
|
dbProvider,
|
||||||
runtimeContext,
|
runtimeContext,
|
||||||
serviceProvider,
|
|
||||||
kafkaOptions)
|
kafkaOptions)
|
||||||
{
|
{
|
||||||
serverTagName = kafkaOptions.Value.ServerTagName;
|
serverTagName = kafkaOptions.Value.ServerTagName;
|
||||||
_dbProvider = dbProvider;
|
|
||||||
_logger = logger;
|
|
||||||
_serviceProvider = serviceProvider;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed override string SystemType => SystemTypeConst.Energy;
|
public sealed override string SystemType => SystemTypeConst.Energy;
|
||||||
@ -146,199 +138,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
.QueryAsync<AmmeterInfo>(sql);
|
.QueryAsync<AmmeterInfo>(sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取电表阀控配置
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="currentTime">阀控的时间</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
|
|
||||||
{
|
|
||||||
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
|
|
||||||
FROM TB_AutoTripTask(nolock) AS A
|
|
||||||
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
|
|
||||||
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
|
|
||||||
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
|
|
||||||
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
|
|
||||||
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime";
|
|
||||||
|
|
||||||
if (!string.IsNullOrWhiteSpace(currentTime))
|
|
||||||
{
|
|
||||||
sql = $@"{sql} AND A.TripTime = '{currentTime}'";
|
|
||||||
}
|
|
||||||
|
|
||||||
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
|
||||||
.Ado
|
|
||||||
.QueryAsync<AmmeterAutoValveControlSetting>(sql);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 电表自动阀控
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
public override async Task AmmeterScheduledAutoValveControl()
|
|
||||||
{
|
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
|
||||||
|
|
||||||
var currentTime = DateTime.Now;
|
|
||||||
string currentTimeStr = $"{currentTime:HH:mm}";
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
|
|
||||||
|
|
||||||
if (settingInfos == null || settingInfos.Count <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//批量获取对应的缓存电表信息
|
|
||||||
var ammeterInfos = new List<AmmeterInfo>();
|
|
||||||
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
|
||||||
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
|
||||||
|
|
||||||
foreach (var settingInfo in settingInfos)
|
|
||||||
{
|
|
||||||
bool isGenerate = false;
|
|
||||||
switch (settingInfo.LoopType)
|
|
||||||
{
|
|
||||||
case "Once":
|
|
||||||
isGenerate = CommonHelper.JudgeIsGenerate_Once(settingInfo.OnceWithDate.Value, currentTime);
|
|
||||||
break;
|
|
||||||
case "EachDay":
|
|
||||||
isGenerate = CommonHelper.JudgeIsGenerate_Day(settingInfo.EachDayWithout, currentTime);
|
|
||||||
break;
|
|
||||||
case "EachWeek":
|
|
||||||
isGenerate = CommonHelper.JudgeIsGenerate_Week(settingInfo.EachWeekWith, currentTime);
|
|
||||||
break;
|
|
||||||
case "EachMonth":
|
|
||||||
isGenerate = CommonHelper.JudgeIsGenerate_Month(settingInfo.EachMonthWith, currentTime);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isGenerate)//不生成,跳入下一循环
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
//获取对应的缓存电表信息
|
|
||||||
var ammeterInfo = ammeterInfos.First();
|
|
||||||
bool tripStateResult = false;
|
|
||||||
if (settingInfo.TripType.Equals("on"))
|
|
||||||
{
|
|
||||||
ammeterInfo.TripState = 0;
|
|
||||||
tripStateResult = true;
|
|
||||||
}
|
|
||||||
else if (settingInfo.TripType.Equals("off"))
|
|
||||||
{
|
|
||||||
ammeterInfo.TripState = 1;
|
|
||||||
tripStateResult = false;
|
|
||||||
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
|
||||||
if (protocolPlugin == null)
|
|
||||||
{
|
|
||||||
_logger.LogError("协议不存在!");
|
|
||||||
}
|
|
||||||
|
|
||||||
var temCode = "10_98";
|
|
||||||
var itemCodeArr = temCode.Split('_');
|
|
||||||
var aFNStr = itemCodeArr[0];
|
|
||||||
var aFN = (AFN)(aFNStr.HexToDec());
|
|
||||||
var fn = int.Parse(itemCodeArr[1]);
|
|
||||||
TelemetryPacketResponse builderResponse = null;
|
|
||||||
string methonCode = $"AFN{aFNStr}_Fn_Send";
|
|
||||||
//特殊表暂不处理
|
|
||||||
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
|
|
||||||
, out var handler))
|
|
||||||
{
|
|
||||||
builderResponse = handler(new TelemetryPacketRequest()
|
|
||||||
{
|
|
||||||
FocusAddress = ammeterInfo.FocusAddress,
|
|
||||||
Fn = fn,
|
|
||||||
Pn = ammeterInfo.MeteringCode,
|
|
||||||
DataUnit = Build645SendData.BuildAmmeterValveControlSendDataUnit(ammeterInfo.AmmerterAddress, "", ammeterInfo.Password, tripStateResult),//生成阀控报文
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}无效编码,-103");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (builderResponse == null)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}报文构建失败,-104");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA);
|
|
||||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
|
||||||
{
|
|
||||||
SystemName = SystemType,
|
|
||||||
ProjectId = $"{ammeterInfo.ProjectID}",
|
|
||||||
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
|
||||||
DeviceId = $"{ammeterInfo.MeterId}",
|
|
||||||
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
|
||||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
|
||||||
PendingCopyReadTime = currentTime,
|
|
||||||
CreationTime = currentTime,
|
|
||||||
MeterAddress = ammeterInfo.AmmerterAddress,
|
|
||||||
AFN = (int)aFN,
|
|
||||||
Fn = fn,
|
|
||||||
//Seq = builderResponse.Seq,
|
|
||||||
MSA = builderResponse.MSA,
|
|
||||||
ItemCode = temCode,
|
|
||||||
TaskMark = taskMark,
|
|
||||||
IsSend = false,
|
|
||||||
ManualOrNot = false,
|
|
||||||
Pn = ammeterInfo.MeteringCode,
|
|
||||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
|
||||||
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
|
||||||
IsReceived = false,
|
|
||||||
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
|
||||||
};
|
|
||||||
taskList.Add(meterReadingRecords);
|
|
||||||
}
|
|
||||||
if (taskList == null || taskList.Count <= 0)
|
|
||||||
{
|
|
||||||
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-105");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//任务记录入库
|
|
||||||
_ = _dbProvider.BatchInsertAsync(metadata, taskList);
|
|
||||||
|
|
||||||
//任务信息推送Kafka
|
|
||||||
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
|
||||||
items: taskList,
|
|
||||||
deviceIdSelector: data => data.DeviceId,
|
|
||||||
processor: (data, groupIndex) =>
|
|
||||||
{
|
|
||||||
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, data, groupIndex);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
//todo 阀控记录入库,推送到新的服务
|
|
||||||
}
|
|
||||||
catch (Exception)
|
|
||||||
{
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取水表信息
|
/// 获取水表信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -1,37 +1,58 @@
|
|||||||
using JiShe.CollectBus.Common.Consts;
|
using System;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using System.Collections.Generic;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using DeviceDetectorNET.Parser.Device;
|
||||||
|
using JiShe.CollectBus.Common.Consts;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
using JiShe.CollectBus.Kafka.Attributes;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using TouchSocket.Sockets;
|
using TouchSocket.Sockets;
|
||||||
|
using Volo.Abp.Caching;
|
||||||
|
using Volo.Abp.Domain.Repositories;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Subscribers
|
namespace JiShe.CollectBus.Subscribers
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 定时抄读任务消息消费订阅
|
/// 定时抄读任务消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
[Route($"/worker/app/subscriber")]
|
||||||
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
|
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
|
||||||
{
|
{
|
||||||
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
private readonly IIoTDbProvider _dbProvider;
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
|
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of the <see cref="WorkerSubscriberAppService"/> class.
|
/// Initializes a new instance of the <see cref="WorkerSubscriberAppService"/> class.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger">The logger.</param>
|
/// <param name="logger">The logger.</param>
|
||||||
/// <param name="tcpService">The TCP service.</param>
|
/// <param name="tcpService">The TCP service.</param>
|
||||||
/// <param name="dbProvider">IoTDB数据驱动</param>
|
/// <param name="deviceRepository">The Device pepository.</param>
|
||||||
|
/// <param name="serviceProvider">The service provider.</param>
|
||||||
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
|
||||||
ITcpService tcpService,
|
ITcpService tcpService,
|
||||||
IIoTDbProvider dbProvider)
|
IRepository<Device, Guid> deviceRepository,
|
||||||
|
IMeterReadingRecordRepository meterReadingRecordsRepository,
|
||||||
|
IServiceProvider serviceProvider)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
_dbProvider = dbProvider;
|
_serviceProvider = serviceProvider;
|
||||||
|
_deviceRepository = deviceRepository;
|
||||||
|
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -42,12 +63,28 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
|
[HttpPost]
|
||||||
|
[Route("ammeter/oneminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
|
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
||||||
return await SendMessagesAsync(receivedMessage);
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
if (protocolPlugin == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
|
||||||
|
if (device != null)
|
||||||
|
{
|
||||||
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -55,11 +92,28 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
|
[HttpPost]
|
||||||
|
[Route("ammeter/fiveminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
||||||
|
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
||||||
return await SendMessagesAsync(receivedMessage);
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
if (protocolPlugin == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
|
||||||
|
if (device != null)
|
||||||
|
{
|
||||||
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -67,23 +121,36 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
|
[HttpPost]
|
||||||
|
[Route("ammeter/fifteenminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
||||||
|
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
||||||
return await SendMessagesAsync(receivedMessage);
|
try
|
||||||
}
|
{
|
||||||
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
if (protocolPlugin == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
|
||||||
|
if (device != null)
|
||||||
|
{
|
||||||
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
|
|
||||||
/// <summary>
|
}
|
||||||
/// 电表自动阀控下行消息消费订阅
|
}
|
||||||
/// </summary>
|
return SubscribeAck.Success();
|
||||||
/// <param name="receivedMessage"></param>
|
}
|
||||||
/// <returns></returns>
|
catch (Exception ex)
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)]
|
{
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
|
|
||||||
{
|
throw ex;
|
||||||
_logger.LogInformation("电表自动阀控下行消息消费队列开始处理");
|
}
|
||||||
return await SendMessagesAsync(receivedMessage);
|
|
||||||
}
|
}
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
@ -94,41 +161,28 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="receivedMessage"></param>
|
/// <param name="receivedMessage"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
|
[HttpPost]
|
||||||
|
[Route("watermeter/fifteenminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
|
//[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
||||||
|
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
return await SendMessagesAsync(receivedMessage);
|
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
||||||
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
if (protocolPlugin == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("【15分钟采集水表数据下行消息消费队列开始处理】协议不存在!");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
|
||||||
|
if (device != null)
|
||||||
|
{
|
||||||
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 设备报文发送
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="receivedMessage">消息记录</param>
|
|
||||||
/// <returns></returns>
|
|
||||||
private async Task<ISubscribeAck> SendMessagesAsync(MeterReadingTelemetryPacketInfo receivedMessage)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress);
|
|
||||||
if (checkResult)
|
|
||||||
{
|
|
||||||
await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
|
|
||||||
|
|
||||||
receivedMessage.IsSend = true;
|
|
||||||
await _dbProvider.InsertAsync(receivedMessage);
|
|
||||||
|
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return SubscribeAck.Fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception)
|
|
||||||
{
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,7 +6,7 @@ using System.Linq;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IotSystems.Ammeters
|
namespace JiShe.CollectBus.Ammeters
|
||||||
{
|
{
|
||||||
public class AmmeterInfo: DeviceCacheBasicModel
|
public class AmmeterInfo: DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
@ -1,107 +0,0 @@
|
|||||||
using JiShe.CollectBus.Common.Models;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IotSystems.Ammeters
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 电表自动阀控制设置实体
|
|
||||||
/// </summary>
|
|
||||||
public class AmmeterAutoValveControlSetting: DeviceCacheBasicModel
|
|
||||||
{
|
|
||||||
public int ID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 定时阀控名称
|
|
||||||
/// </summary>
|
|
||||||
public string Name { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 是否禁止: 1 禁止, 0 启用,
|
|
||||||
/// </summary>
|
|
||||||
public int IsForbid { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 阀控命令:on 合闸(通电), off 断闸(断电)
|
|
||||||
/// </summary>
|
|
||||||
public string TripType { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 执行周期:Once 一次、EachDay 每日、EachWeek 每周、EachMonth 每月、
|
|
||||||
/// </summary>
|
|
||||||
public string LoopType { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// (专属字段)执行一次时,指定yyyy-MM-dd 00:00:00 (只用年月日)
|
|
||||||
/// </summary>
|
|
||||||
public DateTime? OnceWithDate { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// (专属字段)每日执行时排除(多个用英文逗号分隔),如:"周六,周日"
|
|
||||||
/// </summary>
|
|
||||||
public string EachDayWithout { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// (专属字段)每周执行时运行(多个用英文逗号分隔),如:"周一,周二,周日"
|
|
||||||
/// </summary>
|
|
||||||
public string EachWeekWith { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// (专属字段)每月执行时运行(多个用英文逗号分隔),如:"1,2,3,5"
|
|
||||||
/// </summary>
|
|
||||||
public string EachMonthWith { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 阀控执行时分:HH:mm:00
|
|
||||||
/// </summary>
|
|
||||||
public string TripTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 项目ID
|
|
||||||
/// </summary>
|
|
||||||
public int ProjectID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 添加时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime AddDate { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 删除改成-1,
|
|
||||||
/// </summary>
|
|
||||||
public int State { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 实际执行时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime? WorkTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 创建人ID
|
|
||||||
/// </summary>
|
|
||||||
public int CreateUserID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 采集编号
|
|
||||||
/// </summary>
|
|
||||||
public string GatherCode { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 集中器地址
|
|
||||||
/// </summary>
|
|
||||||
public string Address { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 集中器区域代码
|
|
||||||
/// </summary>
|
|
||||||
public string AreaCode { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 电表通信地址
|
|
||||||
/// </summary>
|
|
||||||
public string AmmerterAddress { get; set; }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -15,7 +15,7 @@ using Volo.Abp.Domain.Entities.Auditing;
|
|||||||
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 抄读任务数据
|
/// 抄读任务Redis缓存数据记录
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[EntityType(EntityTypeEnum.TableModel)]
|
[EntityType(EntityTypeEnum.TableModel)]
|
||||||
public class MeterReadingTelemetryPacketInfo : IoTEntity
|
public class MeterReadingTelemetryPacketInfo : IoTEntity
|
||||||
@ -56,13 +56,6 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
|||||||
[FIELDColumn]
|
[FIELDColumn]
|
||||||
public int FocusId { get; set; }
|
public int FocusId { get; set; }
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 集中器地址
|
|
||||||
/// </summary>
|
|
||||||
[FIELDColumn]
|
|
||||||
public string FocusAddress { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 表Id
|
/// 表Id
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
|
|||||||
/// <param name="ammeterAddress">电表地址</param>
|
/// <param name="ammeterAddress">电表地址</param>
|
||||||
/// <param name="specialControlCode">特殊控制码</param>
|
/// <param name="specialControlCode">特殊控制码</param>
|
||||||
/// <param name="password">密码</param>
|
/// <param name="password">密码</param>
|
||||||
/// <param name="state">是否为开阀,true合闸,false关闸</param>
|
/// <param name="state">是否为开阀</param>
|
||||||
/// <param name="modelCode">型号码</param>
|
/// <param name="modelCode">型号码</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static List<string> BuildAmmeterValveControlSendDataUnit(string ammeterAddress, string specialControlCode, string password, bool state, string modelCode = "")
|
public static List<string> BuildAmmeterValveControlSendDataUnit(string ammeterAddress, string specialControlCode, string password, bool state, string modelCode = "")
|
||||||
|
|||||||
@ -4,7 +4,7 @@ using System.Linq;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.Models
|
namespace JiShe.CollectBus.Common.BuildSendDatas
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 待下发的指令生产任务数据
|
/// 待下发的指令生产任务数据
|
||||||
@ -775,61 +775,5 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
|
|
||||||
return makstr;// Convert.ToInt32(makstr) << 32 | msa;
|
return makstr;// Convert.ToInt32(makstr) << 32 | msa;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 判断是生成月
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="eachMonthWith"></param>
|
|
||||||
/// <param name="curTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static bool JudgeIsGenerate_Month(string eachMonthWith, DateTime curTime)
|
|
||||||
{
|
|
||||||
var arr = eachMonthWith.Split(',');
|
|
||||||
if (arr.Contains(curTime.Day.ToString()))
|
|
||||||
return true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 判断是生成一次
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="onceWithDate"></param>
|
|
||||||
/// <param name="curTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static bool JudgeIsGenerate_Once(DateTime onceWithDate, DateTime curTime) => curTime.Date.Equals(onceWithDate);//为当天时发送
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 判断是生成日
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="eachDayWithout"></param>
|
|
||||||
/// <param name="curTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static bool JudgeIsGenerate_Day(string eachDayWithout, DateTime curTime)
|
|
||||||
{
|
|
||||||
var weekName = strWeeks[(int)curTime.DayOfWeek];
|
|
||||||
var arr = eachDayWithout.Split(',');
|
|
||||||
return !arr.Contains(weekName);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static readonly List<string> strWeeks = new List<string>() { "周日", "周一", "周二", "周三", "周四", "周五", "周六" };
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 判断是生成周
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="eachWeekWith"></param>
|
|
||||||
/// <param name="curTime"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static bool JudgeIsGenerate_Week(string eachWeekWith, DateTime curTime)
|
|
||||||
{
|
|
||||||
if (string.IsNullOrWhiteSpace(eachWeekWith))
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
var weekName = strWeeks[(int)curTime.DayOfWeek];
|
|
||||||
var arr = eachWeekWith.Split(',');
|
|
||||||
return arr.Contains(weekName);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -80,7 +80,7 @@
|
|||||||
"KafkaReplicationFactor": 3,
|
"KafkaReplicationFactor": 3,
|
||||||
"NumPartitions": 30,
|
"NumPartitions": 30,
|
||||||
"ServerTagName": "JiSheCollectBus30",
|
"ServerTagName": "JiSheCollectBus30",
|
||||||
"FirstCollectionTime": "2025-04-23 11:11:00"
|
"FirstCollectionTime": "2025-04-22 16:07:00"
|
||||||
},
|
},
|
||||||
"IoTDBOptions": {
|
"IoTDBOptions": {
|
||||||
"UserName": "root",
|
"UserName": "root",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user