This commit is contained in:
cli 2025-04-24 00:34:30 +08:00
commit 7adee9f257
11 changed files with 530 additions and 172 deletions

View File

@ -2,6 +2,7 @@
using System.Threading.Tasks;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Watermeter;
using Volo.Abp.Application.Services;
@ -58,6 +59,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
Task AmmeterScheduledMeterFifteenMinuteReading();
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime);
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
Task AmmeterScheduledAutoValveControl();
#endregion

View File

@ -20,19 +20,25 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
/// <summary>
/// 电表自动阀控下行消息消费订阅
/// </summary>
/// <returns></returns>
Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo issuedEventMessage);
#endregion
#region
@ -40,7 +46,7 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage);
#endregion
}

View File

@ -1,10 +1,4 @@
using Amazon.Runtime.Internal.Endpoints.StandardLibrary;
using Confluent.Kafka;
using DeviceDetectorNET.Parser.Device;
using DnsClient.Protocol;
using FreeSql;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
@ -13,36 +7,26 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Mapster;
using Microsoft.Extensions.Configuration;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata;
using static Thrift.Protocol.Utilities.TJSONProtocolConstants;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -57,6 +41,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly IServiceProvider _serviceProvider;
int pageSize = 3000;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
@ -64,6 +51,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IServiceProvider serviceProvider,
IOptions<KafkaOptionConfig> kafkaOptions)
{
_logger = logger;
@ -72,6 +60,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
_serviceProvider = serviceProvider;
_runtimeContext.UseTableSessionPool = true;
}
@ -421,11 +410,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = 3000,
PageSize = pageSize,
Conditions = conditions,
});
@ -458,11 +447,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = 3000,
PageSize = pageSize,
Conditions = conditions,
});
}
@ -494,11 +483,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = 3000,
PageSize = pageSize,
Conditions = conditions,
});
}
@ -643,7 +632,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
continue;
}
}
@ -689,6 +678,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return taskList;
}
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public virtual Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
public virtual Task AmmeterScheduledAutoValveControl()
{
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
}
#endregion
@ -824,11 +832,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(timeDensity, new IoTDBQueryOptions()
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = 3000,
PageSize = pageSize,
Conditions = conditions,
});
@ -904,6 +912,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
tempCodes = new List<string>() { "10_1" };
}
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
foreach (var tempItem in tempCodes)
{
@ -995,7 +1008,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="nextTaskTime"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
{
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
{
@ -1013,7 +1026,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns>
private async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
protected async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
{
var timer = Stopwatch.StartNew();
@ -1074,9 +1087,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <summary>
/// 创建Kafka消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="kafkaTopicName">kafka主题名称</param>
/// <param name="options">任务查询条件</param>
/// <returns></returns>
private async Task CreateMeterKafkaTaskMessage<T>(int timeDensity, IoTDBQueryOptions options) where T : IoTEntity, new()
protected async Task CreateMeterKafkaTaskMessage<T>(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new()
{
if (string.IsNullOrWhiteSpace(kafkaTopicName))
{
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空-101");
return;
}
int pageNumber = 0;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
@ -1103,7 +1124,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} while (hasNext);
stopwatch.Stop();
_logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
/// <summary>
@ -1113,7 +1134,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessageAction<T>(string topicName,
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
T taskRecord, int partition) where T : class
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)

View File

@ -1,30 +1,30 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.Uow;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -36,6 +36,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{
string serverTagName = string.Empty;
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IIoTDbProvider _dbProvider;
private readonly IServiceProvider _serviceProvider;
public EnergySystemScheduledMeterReadingService(
ILogger<EnergySystemScheduledMeterReadingService> logger,
@ -43,15 +46,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IOptions<KafkaOptionConfig> kafkaOptions,
IoTDBRuntimeContext runtimeContext,
IProducerService producerService,
IServiceProvider serviceProvider,
IRedisDataCacheService redisDataCacheService)
: base(logger,
producerService,
redisDataCacheService,
dbProvider,
runtimeContext,
serviceProvider,
kafkaOptions)
{
serverTagName = kafkaOptions.Value.ServerTagName;
_dbProvider = dbProvider;
_logger = logger;
_serviceProvider = serviceProvider;
}
public sealed override string SystemType => SystemTypeConst.Energy;
@ -138,6 +146,199 @@ namespace JiShe.CollectBus.ScheduledMeterReading
.QueryAsync<AmmeterInfo>(sql);
}
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
FROM TB_AutoTripTask(nolock) AS A
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime";
if (!string.IsNullOrWhiteSpace(currentTime))
{
sql = $@"{sql} AND A.TripTime = '{currentTime}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<AmmeterAutoValveControlSetting>(sql);
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
public override async Task AmmeterScheduledAutoValveControl()
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm}";
try
{
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
if (settingInfos == null || settingInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
return;
}
//批量获取对应的缓存电表信息
var ammeterInfos = new List<AmmeterInfo>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
foreach (var settingInfo in settingInfos)
{
bool isGenerate = false;
switch (settingInfo.LoopType)
{
case "Once":
isGenerate = CommonHelper.JudgeIsGenerate_Once(settingInfo.OnceWithDate.Value, currentTime);
break;
case "EachDay":
isGenerate = CommonHelper.JudgeIsGenerate_Day(settingInfo.EachDayWithout, currentTime);
break;
case "EachWeek":
isGenerate = CommonHelper.JudgeIsGenerate_Week(settingInfo.EachWeekWith, currentTime);
break;
case "EachMonth":
isGenerate = CommonHelper.JudgeIsGenerate_Month(settingInfo.EachMonthWith, currentTime);
break;
}
if (!isGenerate)//不生成,跳入下一循环
{
continue;
}
//获取对应的缓存电表信息
var ammeterInfo = ammeterInfos.First();
bool tripStateResult = false;
if (settingInfo.TripType.Equals("on"))
{
ammeterInfo.TripState = 0;
tripStateResult = true;
}
else if (settingInfo.TripType.Equals("off"))
{
ammeterInfo.TripState = 1;
tripStateResult = false;
}
else
{
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
continue;
}
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
var temCode = "10_98";
var itemCodeArr = temCode.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)(aFNStr.HexToDec());
var fn = int.Parse(itemCodeArr[1]);
TelemetryPacketResponse builderResponse = null;
string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
builderResponse = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode,
DataUnit = Build645SendData.BuildAmmeterValveControlSendDataUnit(ammeterInfo.AmmerterAddress, "", ammeterInfo.Password, tripStateResult),//生成阀控报文
});
}
else
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}无效编码,-103");
continue;
}
if (builderResponse == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}报文构建失败,-104");
continue;
}
string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = currentTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
AFN = (int)aFN,
Fn = fn,
//Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = temCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
}
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-105");
return;
}
//任务记录入库
_ = _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, data, groupIndex);
}
);
//todo 阀控记录入库,推送到新的服务
}
catch (Exception)
{
throw;
}
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 获取水表信息
/// </summary>

View File

@ -1,58 +1,37 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
using TouchSocket.Sockets;
using Volo.Abp.Caching;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
/// <summary>
/// 定时抄读任务消息消费订阅
/// </summary>
[Route($"/worker/app/subscriber")]
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
{
private readonly ILogger<WorkerSubscriberAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDbProvider _dbProvider;
/// <summary>
/// Initializes a new instance of the <see cref="WorkerSubscriberAppService"/> class.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="tcpService">The TCP service.</param>
/// <param name="deviceRepository">The Device pepository.</param>
/// <param name="serviceProvider">The service provider.</param>
/// <param name="dbProvider">IoTDB数据驱动</param>
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
ITcpService tcpService,
IRepository<Device, Guid> deviceRepository,
IMeterReadingRecordRepository meterReadingRecordsRepository,
IServiceProvider serviceProvider)
IIoTDbProvider dbProvider)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_deviceRepository = deviceRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider;
}
@ -63,28 +42,12 @@ namespace JiShe.CollectBus.Subscribers
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("ammeter/oneminute/issued-event")]
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
return await SendMessagesAsync(receivedMessage);
}
}
return SubscribeAck.Success();
}
/// <summary>
@ -92,28 +55,11 @@ namespace JiShe.CollectBus.Subscribers
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("ammeter/fiveminute/issued-event")]
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
return SubscribeAck.Success();
return await SendMessagesAsync(receivedMessage);
}
/// <summary>
@ -121,36 +67,23 @@ namespace JiShe.CollectBus.Subscribers
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("ammeter/fifteenminute/issued-event")]
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
//[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
try
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
return await SendMessagesAsync(receivedMessage);
}
}
}
return SubscribeAck.Success();
}
catch (Exception ex)
{
throw ex;
}
/// <summary>
/// 电表自动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("电表自动阀控下行消息消费队列开始处理");
return await SendMessagesAsync(receivedMessage);
}
#endregion
@ -161,28 +94,41 @@ namespace JiShe.CollectBus.Subscribers
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
//[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("【15分钟采集水表数据下行消息消费队列开始处理】协议不存在");
}
else
{
var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
return SubscribeAck.Success();
return await SendMessagesAsync(receivedMessage);
}
#endregion
/// <summary>
/// 设备报文发送
/// </summary>
/// <param name="receivedMessage">消息记录</param>
/// <returns></returns>
private async Task<ISubscribeAck> SendMessagesAsync(MeterReadingTelemetryPacketInfo receivedMessage)
{
try
{
var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress);
if (checkResult)
{
await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
receivedMessage.IsSend = true;
await _dbProvider.InsertAsync(receivedMessage);
return SubscribeAck.Success();
}
else
{
return SubscribeAck.Fail();
}
}
catch (Exception)
{
throw;
}
}
}
}

View File

@ -0,0 +1,107 @@
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IotSystems.Ammeters
{
/// <summary>
/// 电表自动阀控制设置实体
/// </summary>
public class AmmeterAutoValveControlSetting: DeviceCacheBasicModel
{
public int ID { get; set; }
/// <summary>
/// 定时阀控名称
/// </summary>
public string Name { get; set; }
/// <summary>
/// 是否禁止: 1 禁止, 0 启用,
/// </summary>
public int IsForbid { get; set; }
/// <summary>
/// 阀控命令on 合闸(通电) off 断闸(断电)
/// </summary>
public string TripType { get; set; }
/// <summary>
/// 执行周期Once 一次、EachDay 每日、EachWeek 每周、EachMonth 每月、
/// </summary>
public string LoopType { get; set; }
/// <summary>
/// 专属字段执行一次时指定yyyy-MM-dd 00:00:00 (只用年月日)
/// </summary>
public DateTime? OnceWithDate { get; set; }
/// <summary>
/// (专属字段)每日执行时排除(多个用英文逗号分隔),如:"周六,周日"
/// </summary>
public string EachDayWithout { get; set; }
/// <summary>
/// (专属字段)每周执行时运行(多个用英文逗号分隔),如:"周一,周二,周日"
/// </summary>
public string EachWeekWith { get; set; }
/// <summary>
/// (专属字段)每月执行时运行(多个用英文逗号分隔),如:"1,2,3,5"
/// </summary>
public string EachMonthWith { get; set; }
/// <summary>
/// 阀控执行时分HH:mm:00
/// </summary>
public string TripTime { get; set; }
/// <summary>
/// 项目ID
/// </summary>
public int ProjectID { get; set; }
/// <summary>
/// 添加时间
/// </summary>
public DateTime AddDate { get; set; }
/// <summary>
/// 删除改成-1
/// </summary>
public int State { get; set; }
/// <summary>
/// 实际执行时间
/// </summary>
public DateTime? WorkTime { get; set; }
/// <summary>
/// 创建人ID
/// </summary>
public int CreateUserID { get; set; }
/// <summary>
/// 采集编号
/// </summary>
public string GatherCode { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string Address { get; set; }
/// <summary>
/// 集中器区域代码
/// </summary>
public string AreaCode { get; set; }
/// <summary>
/// 电表通信地址
/// </summary>
public string AmmerterAddress { get; set; }
}
}

View File

@ -6,7 +6,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Ammeters
namespace JiShe.CollectBus.IotSystems.Ammeters
{
public class AmmeterInfo: DeviceCacheBasicModel
{

View File

@ -15,7 +15,7 @@ using Volo.Abp.Domain.Entities.Auditing;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 抄读任务Redis缓存数据记录
/// 抄读任务数据
/// </summary>
[EntityType(EntityTypeEnum.TableModel)]
public class MeterReadingTelemetryPacketInfo : IoTEntity
@ -56,6 +56,13 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
[FIELDColumn]
public int FocusId { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
[FIELDColumn]
public string FocusAddress { get; set; }
/// <summary>
/// 表Id
/// </summary>

View File

@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
/// <param name="ammeterAddress">电表地址</param>
/// <param name="specialControlCode">特殊控制码</param>
/// <param name="password">密码</param>
/// <param name="state">是否为开阀</param>
/// <param name="state">是否为开阀,true合闸false关闸</param>
/// <param name="modelCode">型号码</param>
/// <returns></returns>
public static List<string> BuildAmmeterValveControlSendDataUnit(string ammeterAddress, string specialControlCode, string password, bool state, string modelCode = "")

View File

@ -775,5 +775,61 @@ namespace JiShe.CollectBus.Common.Helpers
return makstr;// Convert.ToInt32(makstr) << 32 | msa;
}
/// <summary>
/// 判断是生成月
/// </summary>
/// <param name="eachMonthWith"></param>
/// <param name="curTime"></param>
/// <returns></returns>
public static bool JudgeIsGenerate_Month(string eachMonthWith, DateTime curTime)
{
var arr = eachMonthWith.Split(',');
if (arr.Contains(curTime.Day.ToString()))
return true;
return false;
}
/// <summary>
/// 判断是生成一次
/// </summary>
/// <param name="onceWithDate"></param>
/// <param name="curTime"></param>
/// <returns></returns>
public static bool JudgeIsGenerate_Once(DateTime onceWithDate, DateTime curTime) => curTime.Date.Equals(onceWithDate);//为当天时发送
/// <summary>
/// 判断是生成日
/// </summary>
/// <param name="eachDayWithout"></param>
/// <param name="curTime"></param>
/// <returns></returns>
public static bool JudgeIsGenerate_Day(string eachDayWithout, DateTime curTime)
{
var weekName = strWeeks[(int)curTime.DayOfWeek];
var arr = eachDayWithout.Split(',');
return !arr.Contains(weekName);
}
public static readonly List<string> strWeeks = new List<string>() { "周日", "周一", "周二", "周三", "周四", "周五", "周六" };
/// <summary>
/// 判断是生成周
/// </summary>
/// <param name="eachWeekWith"></param>
/// <param name="curTime"></param>
/// <returns></returns>
public static bool JudgeIsGenerate_Week(string eachWeekWith, DateTime curTime)
{
if (string.IsNullOrWhiteSpace(eachWeekWith))
{
return false;
}
var weekName = strWeeks[(int)curTime.DayOfWeek];
var arr = eachWeekWith.Split(',');
return arr.Contains(weekName);
}
}
}

View File

@ -4,7 +4,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.BuildSendDatas
namespace JiShe.CollectBus.Common.Models
{
/// <summary>
/// 待下发的指令生产任务数据