Compare commits

..

No commits in common. "5ba1325204d1728347d08c733c6e7d695f732e60" and "a97421a00ecefcdb485dc0705fe6773ddade92c0" have entirely different histories.

10 changed files with 174 additions and 280 deletions

View File

@ -1,10 +1,7 @@
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using System;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.DataChannels namespace JiShe.CollectBus.DataChannels
@ -14,19 +11,5 @@ namespace JiShe.CollectBus.DataChannels
/// </summary> /// </summary>
public interface IDataChannelManageService public interface IDataChannelManageService
{ {
#region
/// <summary>
/// 定时任务数据通道写入
/// </summary>
/// <returns></returns>
Task ScheduledMeterTaskWriter(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems);
/// <summary>
/// 定时任务数据入库和Kafka推送通道
/// </summary>
/// <returns></returns>
Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader);
#endregion
} }
} }

View File

@ -19,7 +19,7 @@ namespace JiShe.CollectBus.DataMigration.Options
/// <summary> /// <summary>
/// 批量处理通道容量 /// 批量处理通道容量
/// </summary> /// </summary>
public int ChannelCapacity { get; set; } = 100_00; public int ChannelCapacity { get; set; } = 100;
/// <summary> /// <summary>
/// 数据库 每批处理量 /// 数据库 每批处理量

View File

@ -4,7 +4,6 @@ using System.Threading.Tasks;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
@ -72,7 +71,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 电表自动阀控 /// 电表自动阀控
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutoValveControl(); Task AmmeterScheduledAutoValveControl();
/// <summary> /// <summary>
/// 电表自动校时 /// 电表自动校时
@ -82,7 +81,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
/// <summary> /// <summary>
/// 日冻结抄读 /// 日冻结抄读
@ -92,7 +91,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
/// <summary> /// <summary>
/// 月冻结数据抄读 /// 月冻结数据抄读
@ -102,7 +101,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
#endregion #endregion
@ -139,7 +138,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
/// <summary> /// <summary>
/// 自动获取远程通信模块(SIM)版本信息 /// 自动获取远程通信模块(SIM)版本信息
@ -149,7 +148,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); Task ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps);
#endregion #endregion

View File

@ -1,18 +1,12 @@
using System; using System.Linq;
using System.Collections.Generic;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cassandra.Mapping; using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IoTDB; using JiShe.CollectBus.IoTDB;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Mappers; using JiShe.CollectBus.Mappers;
using JiShe.CollectBus.Protocol; using JiShe.CollectBus.Protocol;
@ -80,8 +74,6 @@ public class CollectBusApplicationModule : AbpModule
// //await dbContext.InitWatermeterCacheData(); // //await dbContext.InitWatermeterCacheData();
//}).ConfigureAwait(false); //}).ConfigureAwait(false);
//下发任务通道构建
DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<Tuple<string, List<MeterReadingTelemetryPacketInfo>>>();
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();

View File

@ -1,18 +0,0 @@
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace JiShe.CollectBus.DataChannels
{
public static class DataChannelManage
{
/// <summary>
/// 下发任务通道
/// </summary>
public static Channel<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> TaskDataChannel;
}
}

View File

@ -1,37 +1,27 @@
using DnsClient.Protocol; using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common; using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Services; using JiShe.CollectBus.Protocol.Services;
using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.ScheduledMeterReading; using JiShe.CollectBus.ScheduledMeterReading;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.RazorPages;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Timing;
namespace JiShe.CollectBus.DataChannels namespace JiShe.CollectBus.DataChannels
{ {
/// <summary> /// <summary>
/// 数据通道管理服务 /// 数据通道管理服务
/// </summary> /// </summary>
public class DataChannelManageService : IDataChannelManageService, ITransientDependency public class DataChannelManageService: CollectBusAppService, IDataChannelManageService
{ {
private readonly ILogger<DataChannelManageService> _logger; private readonly ILogger<DataChannelManageService> _logger;
private readonly IIoTDbProvider _dbProvider; private readonly IIoTDbProvider _dbProvider;
@ -56,87 +46,5 @@ namespace JiShe.CollectBus.DataChannels
_applicationOptions = applicationOptions.Value; _applicationOptions = applicationOptions.Value;
_runtimeContext.UseTableSessionPool = true; _runtimeContext.UseTableSessionPool = true;
} }
/// <summary>
/// 定时任务数据通道写入
/// </summary>
/// <returns></returns>
public async Task ScheduledMeterTaskWriter(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
{
await _telemetryPacketInfoWriter.WriteAsync(dataItems);
}
/// <summary>
/// 定时任务数据入库和Kafka推送通道
/// </summary>
/// <returns></returns>
public async Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader)
{
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
var batchSize = 10000;
var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
List<MeterReadingTelemetryPacketInfo> taskInfoList = new List<MeterReadingTelemetryPacketInfo>();
var startTime = DateTime.Now;
var timer = new Stopwatch();
while (true)
{
var canRead = await _telemetryPacketInfoReader.WaitToReadAsync();
if (!canRead)
{
continue;
}
while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
{
if (_telemetryPacketInfoReader.TryRead(out var dataItem))
{
taskInfoList.AddRange(dataItem.Item2);
}
else
{
//无消息时短暂等待
await Task.Delay(5);
}
}
if (taskInfoList != null && taskInfoList.Count > 0)
{
await _dbProvider.BatchInsertAsync(metadata, taskInfoList);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskInfoList.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
// _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex);
}
);
taskInfoList.Clear();
}
startTime = DateTime.Now;
}
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
T taskRecord, int partition) where T : class
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
}
} }
} }

View File

@ -28,10 +28,6 @@ using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Models; using JiShe.CollectBus.Protocol.Models;
using System.Threading.Channels; using System.Threading.Channels;
using static IdentityModel.ClaimComparer;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -42,35 +38,35 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{ {
private readonly ILogger<BasicScheduledMeterReadingService> _logger; private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IIoTDbProvider _dbProvider; private readonly IIoTDbProvider _dbProvider;
private readonly IDataChannelManageService _dataChannelManage; private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService; private readonly IRedisDataCacheService _redisDataCacheService;
private readonly IProtocolService _protocolService;
private readonly DataMigrationOptions _dataMigrationOptions;
private readonly KafkaOptionConfig _kafkaOptions; private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions; private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly IProtocolService _protocolService;
int pageSize = 10000; int pageSize = 10000;
public BasicScheduledMeterReadingService( public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger, ILogger<BasicScheduledMeterReadingService> logger,
IDataChannelManageService dataChannelManage, IProducerService producerService,
IRedisDataCacheService redisDataCacheService, IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IProtocolService protocolService, IProtocolService protocolService,
IOptions<DataMigrationOptions> dataMigrationOptions,
IOptions<KafkaOptionConfig> kafkaOptions, IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions) IOptions<ServerApplicationOptions> applicationOptions)
{ {
_logger = logger; _logger = logger;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_dataChannelManage = dataChannelManage; _runtimeContext = runtimeContext;
_producerService = producerService;
_redisDataCacheService = redisDataCacheService; _redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value;
_protocolService = protocolService; _protocolService = protocolService;
_dataMigrationOptions = dataMigrationOptions.Value; _runtimeContext.UseTableSessionPool = true;
_kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value;
} }
/// <summary> /// <summary>
@ -115,8 +111,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101"); _logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101");
return; return;
} }
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
//定时抄读 //定时抄读
foreach (var item in taskInfos) foreach (var item in taskInfos)
@ -151,14 +147,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps); await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
@ -169,13 +158,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps); await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
@ -186,13 +169,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps); await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
@ -203,13 +180,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps); await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
@ -220,13 +191,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps); await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else else
@ -246,6 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//tasksToBeIssueModel.NextTaskTime; //tasksToBeIssueModel.NextTaskTime;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<AmmeterInfo>(
@ -260,7 +226,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask)); _ = _dbProvider.BatchInsertAsync(metadata, tempTask);
}); });
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@ -279,7 +245,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask)); _ = _dbProvider.BatchInsertAsync(metadata, tempTask);
}); });
} }
else else
@ -298,14 +264,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//电表定时阀控任务处理。 //电表定时阀控任务处理。
var autoValveControlTask = await AmmeterScheduledAutoValveControl(); _ = AmmeterScheduledAutoValveControl();
if (autoValveControlTask == null || autoValveControlTask.Count <= 0)
{
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
} }
#region #region
@ -327,8 +287,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
_ = _dataChannelManage.ScheduledMeterTaskReadding(DataChannelManage.TaskDataChannel.Reader);
//此处代码不要删除 //此处代码不要删除
#if DEBUG #if DEBUG
var timeDensity = "15"; var timeDensity = "15";
@ -369,7 +327,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop(); timer1.Stop();
_logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
return; return;
#else #else
@ -625,26 +583,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType); var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105");
return null; return null;
} }
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null; return null;
} }
//载波的不处理 //载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return null; return null;
} }
if (ammeterInfo.State.Equals(2)) if (ammeterInfo.State.Equals(2))
{ {
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); _logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return null; return null;
} }
@ -657,22 +615,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
return null; return null;
} }
if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
return null; return null;
} }
if (Convert.ToInt32(ammeterInfo.Address) > 65535) if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
return null; return null;
} }
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{ {
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})"); _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
return null; return null;
} }
@ -770,7 +728,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 电表自动阀控 /// 电表自动阀控
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public virtual Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutoValveControl() public virtual Task AmmeterScheduledAutoValveControl()
{ {
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现"); throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
} }
@ -783,7 +741,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -794,10 +752,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase)) if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{ {
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间"); _logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null; return;
} }
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN10HFN01H; var itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
var subItemCode = T6452007PacketItemCodeConst.C08; var subItemCode = T6452007PacketItemCodeConst.C08;
@ -807,7 +766,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null; return;
} }
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest() ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
@ -837,10 +796,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null; return;
} }
return null; //任务记录入库
await _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
}
);
//todo 阀控记录入库,推送到新的服务 //todo 阀控记录入库,推送到新的服务
} }
@ -859,7 +829,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -870,17 +840,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase)) if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{ {
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间"); _logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null; return;
} }
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
//根据电表型号获取协议插件 //根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType); var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null; return;
} }
foreach (var item in DayFreezeCodes) foreach (var item in DayFreezeCodes)
@ -908,10 +880,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
return null; return;
} }
return taskList; //任务记录入库
await _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
}
);
} }
catch (Exception) catch (Exception)
@ -929,7 +912,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -940,17 +923,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase)) if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{ {
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间"); _logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null; return;
} }
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
//根据电表型号获取协议插件 //根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType); var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null; return;
} }
foreach (var item in DayFreezeCodes) foreach (var item in DayFreezeCodes)
@ -978,10 +963,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
return null; return;
} }
return taskList; //任务记录入库
await _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
}
);
} }
catch (Exception) catch (Exception)
@ -1038,13 +1034,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IsNumber = false, IsNumber = false,
Value = false Value = false
}); });
//await CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions() await CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions()
//{ {
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1, PageIndex = 1,
// PageSize = pageSize, PageSize = pageSize,
// Conditions = conditions, Conditions = conditions,
//}); });
// 释放锁 // 释放锁
tryLock.Unlock(); tryLock.Unlock();
} }
@ -1109,7 +1105,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{ {
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key), LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
TimeDensity = item.Key, TimeDensity = item.Key,
}; };
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);//使用首次采集时间作为下一次采集时间 nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);//使用首次采集时间作为下一次采集时间
@ -1195,13 +1191,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Value = pendingCopyReadTime Value = pendingCopyReadTime
}); });
//_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions() _ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
//{ {
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1, PageIndex = 1,
// PageSize = pageSize, PageSize = pageSize,
// Conditions = conditions, Conditions = conditions,
//}); });
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReadding)} {timeDensity}分钟采集水表数据处理完成"); _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReadding)} {timeDensity}分钟采集水表数据处理完成");
} }
@ -1325,7 +1321,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -1336,7 +1332,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase)) if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
{ {
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTerminalVersion)} 集中器自动获取版本号,非自动处理时间"); _logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTerminalVersion)} 集中器自动获取版本号,非自动处理时间");
return null; return;
} }
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
@ -1350,7 +1346,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器自动获取版本号{currentTime}没有找到对应的协议组件,-105"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器自动获取版本号{currentTime}没有找到对应的协议组件,-105");
return null; return;
} }
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest() ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
@ -1380,10 +1376,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null; return;
} }
return taskList; //任务记录入库
await _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
}
);
} }
catch (Exception) catch (Exception)
{ {
@ -1400,7 +1407,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param> /// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param> /// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns> /// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) public virtual async Task ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}"; string currentTimeStr = $"{currentTime:HH:mm:00}";
@ -1411,10 +1418,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase)) if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
{ {
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间"); _logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间");
return null; return;
} }
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN09HFN09H; var itemCode = T37612012PacketItemCodeConst.AFN09HFN09H;
@ -1423,7 +1431,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息{currentTime}没有找到对应的协议组件,-105"); _logger.LogError($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息{currentTime}没有找到对应的协议组件,-105");
return null; return;
} }
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest() ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
@ -1447,10 +1455,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null; return;
} }
return taskList; //任务记录入库
await _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
}
);
} }
catch (Exception) catch (Exception)
{ {
@ -1530,7 +1549,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return; return;
} }
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒"); _logger.LogError($"{nameof(CreateMeterPublishTask)} 采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: meterInfos,
@ -1605,7 +1624,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
} }
// await _dataChannelManage.ProduceAsync<T>(topicName, taskRecord, partition); await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
} }
/// <summary> /// <summary>

View File

@ -5,8 +5,6 @@ using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
@ -44,19 +42,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public EnergySystemScheduledMeterReadingService( public EnergySystemScheduledMeterReadingService(
ILogger<EnergySystemScheduledMeterReadingService> logger, ILogger<EnergySystemScheduledMeterReadingService> logger,
IDataChannelManageService dataChannelManage,
IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IProtocolService protocolService,
IOptions<DataMigrationOptions> dataMigrationOptions,
IOptions<KafkaOptionConfig> kafkaOptions, IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions) IOptions<ServerApplicationOptions> applicationOptions,
IoTDBRuntimeContext runtimeContext,
IProducerService producerService,
IProtocolService protocolService,
IRedisDataCacheService redisDataCacheService)
: base(logger, : base(logger,
dataChannelManage, producerService,
redisDataCacheService, redisDataCacheService,
dbProvider, dbProvider,
runtimeContext,
protocolService, protocolService,
dataMigrationOptions,
kafkaOptions, kafkaOptions,
applicationOptions) applicationOptions)
{ {
@ -183,7 +181,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 电表自动阀控 /// 电表自动阀控
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public override async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutoValveControl() public override async Task AmmeterScheduledAutoValveControl()
{ {
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm}"; string currentTimeStr = $"{currentTime:HH:mm}";
@ -196,12 +194,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (settingInfos == null || settingInfos.Count <= 0) if (settingInfos == null || settingInfos.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
return null; return;
} }
//批量获取对应的缓存电表信息 //批量获取对应的缓存电表信息
var ammeterInfos = new List<AmmeterInfo>(); var ammeterInfos = new List<AmmeterInfo>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
foreach (var settingInfo in settingInfos) foreach (var settingInfo in settingInfos)
{ {
@ -260,7 +259,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null; return;
} }
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest() ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
@ -291,10 +290,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (taskList == null || taskList.Count <= 0) if (taskList == null || taskList.Count <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106"); _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null; return;
} }
return taskList; //任务记录入库
await _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, data, groupIndex);
}
);
//todo 阀控记录入库,推送到新的服务 //todo 阀控记录入库,推送到新的服务
} }
catch (Exception) catch (Exception)

View File

@ -193,7 +193,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
int? maxConcurrency = null) int? maxConcurrency = null)
{ {
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
//var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
// 自动计算最佳并发度 // 自动计算最佳并发度
int recommendedThreads = CalculateOptimalThreadCount(); int recommendedThreads = CalculateOptimalThreadCount();
@ -227,8 +227,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
}); });
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
//timer.Stop(); timer.Stop();
//Console.WriteLine($"任务处理完成,耗时:{timer.ElapsedMilliseconds}ms"); Console.WriteLine($"任务处理完成,耗时:{timer.ElapsedMilliseconds}ms");
} }
/// <summary> /// <summary>

View File

@ -5,7 +5,7 @@
@{ @{
Layout = null; Layout = null;
} }
<!DOCTYPE html> <!DOCTYPE html>