diff --git a/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs index 0f04005..6dec9db 100644 --- a/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Watermeter; using Volo.Abp.Application.Services; @@ -58,6 +59,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// Task AmmeterScheduledMeterFifteenMinuteReading(); + /// + /// 获取电表阀控配置 + /// + /// 阀控的时间 + /// + Task> GetAmmeterAutoValveControlSetting(string currentTime); + + /// + /// 电表自动阀控 + /// + /// + Task AmmeterScheduledAutoValveControl(); + #endregion diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index f0a79cf..950f4d9 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -20,19 +20,25 @@ namespace JiShe.CollectBus.Subscribers /// 1分钟采集电表数据下行消息消费订阅 /// /// - Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); /// /// 5分钟采集电表数据下行消息消费订阅 /// /// - Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); /// /// 15分钟采集电表数据下行消息消费订阅 /// /// - Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); + + /// + /// 电表自动阀控下行消息消费订阅 + /// + /// + Task AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo issuedEventMessage); #endregion #region 水表消息采集 @@ -40,7 +46,7 @@ namespace JiShe.CollectBus.Subscribers /// 1分钟采集水表数据下行消息消费订阅 /// /// - Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); #endregion } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 49770ef..f50d121 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,10 +1,4 @@ -using Amazon.Runtime.Internal.Endpoints.StandardLibrary; -using Confluent.Kafka; -using DeviceDetectorNET.Parser.Device; -using DnsClient.Protocol; -using FreeSql; -using JiShe.CollectBus.Ammeters; -using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -13,36 +7,26 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; -using JiShe.CollectBus.EnergySystems.Entities; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Provider; -using JiShe.CollectBus.IotSystems.MessageIssueds; +using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; -using JiShe.CollectBus.Protocol.Contracts; -using JiShe.CollectBus.RedisDataCache; -using JiShe.CollectBus.Repository.MeterReadingRecord; -using Mapster; -using Microsoft.Extensions.Configuration; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Diagnostics; -using System.Diagnostics.Metrics; using System.Linq; -using System.Threading; using System.Threading.Tasks; -using static FreeSql.Internal.GlobalFilter; -using static Microsoft.AspNetCore.Razor.Language.TagHelperMetadata; -using static Thrift.Protocol.Utilities.TJSONProtocolConstants; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -57,6 +41,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IRedisDataCacheService _redisDataCacheService; private readonly KafkaOptionConfig _kafkaOptions; private readonly IoTDBRuntimeContext _runtimeContext; + private readonly IServiceProvider _serviceProvider; + + int pageSize = 3000; public BasicScheduledMeterReadingService( ILogger logger, @@ -64,6 +51,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IRedisDataCacheService redisDataCacheService, IIoTDbProvider dbProvider, IoTDBRuntimeContext runtimeContext, + IServiceProvider serviceProvider, IOptions kafkaOptions) { _logger = logger; @@ -72,6 +60,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; + _serviceProvider = serviceProvider; _runtimeContext.UseTableSessionPool = true; } @@ -321,7 +310,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { continue; } - + foreach (var ammeter in item) { deviceIds.Add(ammeter.MeterId.ToString()); @@ -421,11 +410,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading Value = pendingCopyReadTime }); - _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + _ = CreateMeterKafkaTaskMessage(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), PageIndex = 1, - PageSize = 3000, + PageSize = pageSize, Conditions = conditions, }); @@ -458,11 +447,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading Value = pendingCopyReadTime }); - _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + _ = CreateMeterKafkaTaskMessage(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), PageIndex = 1, - PageSize = 3000, + PageSize = pageSize, Conditions = conditions, }); } @@ -494,11 +483,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading Value = pendingCopyReadTime }); - _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + _ = CreateMeterKafkaTaskMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), PageIndex = 1, - PageSize = 3000, + PageSize = pageSize, Conditions = conditions, }); } @@ -643,7 +632,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else { - //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); + _logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); continue; } } @@ -689,6 +678,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading return taskList; } + /// + /// 获取电表阀控配置 + /// + /// 阀控的时间 + /// + public virtual Task> GetAmmeterAutoValveControlSetting(string currentTime) + { + throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); + } + + /// + /// 电表自动阀控 + /// + /// + public virtual Task AmmeterScheduledAutoValveControl() + { + throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现"); + } + #endregion @@ -824,11 +832,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading Value = pendingCopyReadTime }); - _ = CreateMeterKafkaTaskMessage(timeDensity, new IoTDBQueryOptions() + _ = CreateMeterKafkaTaskMessage(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), PageIndex = 1, - PageSize = 3000, + PageSize = pageSize, Conditions = conditions, }); @@ -904,6 +912,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading tempCodes = new List() { "10_1" }; } + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } foreach (var tempItem in tempCodes) { @@ -995,7 +1008,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// /// - private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0) + protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0) { if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime) { @@ -1013,7 +1026,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 表类型 /// 具体的创建任务的委托 /// - private async Task CreateMeterPublishTask(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel + protected async Task CreateMeterPublishTask(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel { var timer = Stopwatch.StartNew(); @@ -1074,9 +1087,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 创建Kafka消息 /// + /// + /// kafka主题名称 + /// 任务查询条件 /// - private async Task CreateMeterKafkaTaskMessage(int timeDensity, IoTDBQueryOptions options) where T : IoTEntity, new() + protected async Task CreateMeterKafkaTaskMessage(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new() { + if (string.IsNullOrWhiteSpace(kafkaTopicName)) + { + _logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空,-101"); + return; + } int pageNumber = 0; bool hasNext; var stopwatch = Stopwatch.StartNew(); @@ -1103,7 +1124,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } while (hasNext); stopwatch.Stop(); - _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {options.TableNameOrTreePath} {timeDensity}分钟采集任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + _logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } /// @@ -1113,7 +1134,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 任务记录 /// 对应分区,也就是集中器号所在的分组序号 /// - private async Task KafkaProducerIssuedMessageAction(string topicName, + protected async Task KafkaProducerIssuedMessageAction(string topicName, T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 90266ce..82e9588 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,30 +1,30 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using JiShe.CollectBus.Ammeters; -using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; +using JiShe.CollectBus.Common.Encrypt; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; -using JiShe.CollectBus.IotSystems.Devices; -using JiShe.CollectBus.IotSystems.MessageIssueds; +using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; -using JiShe.CollectBus.Repository; -using JiShe.CollectBus.Repository.MeterReadingRecord; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.AspNetCore.Authorization; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Volo.Abp.Domain.Repositories; -using Volo.Abp.Uow; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + namespace JiShe.CollectBus.ScheduledMeterReading { @@ -36,22 +36,30 @@ namespace JiShe.CollectBus.ScheduledMeterReading public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { string serverTagName = string.Empty; + private readonly ILogger _logger; + private readonly IIoTDbProvider _dbProvider; + private readonly IServiceProvider _serviceProvider; public EnergySystemScheduledMeterReadingService( ILogger logger, IIoTDbProvider dbProvider, IOptions kafkaOptions, IoTDBRuntimeContext runtimeContext, - IProducerService producerService, - IRedisDataCacheService redisDataCacheService) + IProducerService producerService, + IServiceProvider serviceProvider, + IRedisDataCacheService redisDataCacheService) : base(logger, producerService, - redisDataCacheService, + redisDataCacheService, dbProvider, runtimeContext, + serviceProvider, kafkaOptions) { serverTagName = kafkaOptions.Value.ServerTagName; + _dbProvider = dbProvider; + _logger = logger; + _serviceProvider = serviceProvider; } public sealed override string SystemType => SystemTypeConst.Energy; @@ -138,6 +146,199 @@ namespace JiShe.CollectBus.ScheduledMeterReading .QueryAsync(sql); } + + /// + /// 获取电表阀控配置 + /// + /// 阀控的时间 + /// + public override async Task> GetAmmeterAutoValveControlSetting(string currentTime) + { + string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId + FROM TB_AutoTripTask(nolock) AS A + INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID + INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID + INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID + INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID + WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime"; + + if (!string.IsNullOrWhiteSpace(currentTime)) + { + sql = $@"{sql} AND A.TripTime = '{currentTime}'"; + } + + return await SqlProvider.Instance.Change(DbEnum.EnergyDB) + .Ado + .QueryAsync(sql); + } + + + /// + /// 电表自动阀控 + /// + /// + public override async Task AmmeterScheduledAutoValveControl() + { + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + + var currentTime = DateTime.Now; + string currentTimeStr = $"{currentTime:HH:mm}"; + + try + { + var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr); + + if (settingInfos == null || settingInfos.Count <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101"); + return; + } + + //批量获取对应的缓存电表信息 + var ammeterInfos = new List(); + List taskList = new List(); + var metadata = await _dbProvider.GetMetadata(); + + foreach (var settingInfo in settingInfos) + { + bool isGenerate = false; + switch (settingInfo.LoopType) + { + case "Once": + isGenerate = CommonHelper.JudgeIsGenerate_Once(settingInfo.OnceWithDate.Value, currentTime); + break; + case "EachDay": + isGenerate = CommonHelper.JudgeIsGenerate_Day(settingInfo.EachDayWithout, currentTime); + break; + case "EachWeek": + isGenerate = CommonHelper.JudgeIsGenerate_Week(settingInfo.EachWeekWith, currentTime); + break; + case "EachMonth": + isGenerate = CommonHelper.JudgeIsGenerate_Month(settingInfo.EachMonthWith, currentTime); + break; + } + + if (!isGenerate)//不生成,跳入下一循环 + { + continue; + } + + //获取对应的缓存电表信息 + var ammeterInfo = ammeterInfos.First(); + bool tripStateResult = false; + if (settingInfo.TripType.Equals("on")) + { + ammeterInfo.TripState = 0; + tripStateResult = true; + } + else if (settingInfo.TripType.Equals("off")) + { + ammeterInfo.TripState = 1; + tripStateResult = false; + + } + else + { + _logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102"); + continue; + } + + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + + var temCode = "10_98"; + var itemCodeArr = temCode.Split('_'); + var aFNStr = itemCodeArr[0]; + var aFN = (AFN)(aFNStr.HexToDec()); + var fn = int.Parse(itemCodeArr[1]); + TelemetryPacketResponse builderResponse = null; + string methonCode = $"AFN{aFNStr}_Fn_Send"; + //特殊表暂不处理 + if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode + , out var handler)) + { + builderResponse = handler(new TelemetryPacketRequest() + { + FocusAddress = ammeterInfo.FocusAddress, + Fn = fn, + Pn = ammeterInfo.MeteringCode, + DataUnit = Build645SendData.BuildAmmeterValveControlSendDataUnit(ammeterInfo.AmmerterAddress, "", ammeterInfo.Password, tripStateResult),//生成阀控报文 + }); + } + else + { + _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}无效编码,-103"); + continue; + } + + if (builderResponse == null) + { + _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}报文构建失败,-104"); + continue; + } + + string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA); + var meterReadingRecords = new MeterReadingTelemetryPacketInfo() + { + SystemName = SystemType, + ProjectId = $"{ammeterInfo.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{ammeterInfo.MeterId}", + Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), + DatabaseBusiID = ammeterInfo.DatabaseBusiID, + PendingCopyReadTime = currentTime, + CreationTime = currentTime, + MeterAddress = ammeterInfo.AmmerterAddress, + AFN = (int)aFN, + Fn = fn, + //Seq = builderResponse.Seq, + MSA = builderResponse.MSA, + ItemCode = temCode, + TaskMark = taskMark, + IsSend = false, + ManualOrNot = false, + Pn = ammeterInfo.MeteringCode, + IssuedMessageId = GuidGenerator.Create().ToString(), + IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), + IsReceived = false, + ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), + }; + taskList.Add(meterReadingRecords); + } + if (taskList == null || taskList.Count <= 0) + { + _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-105"); + return; + } + + //任务记录入库 + _ = _dbProvider.BatchInsertAsync(metadata, taskList); + + //任务信息推送Kafka + _ = DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: taskList, + deviceIdSelector: data => data.DeviceId, + processor: (data, groupIndex) => + { + _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, data, groupIndex); + } + ); + + //todo 阀控记录入库,推送到新的服务 + } + catch (Exception) + { + + throw; + } + + + throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); + } + /// /// 获取水表信息 /// @@ -188,6 +389,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading return await SqlProvider.Instance.Change(DbEnum.EnergyDB) .Ado .QueryAsync(sql); - } + } } } \ No newline at end of file diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index fa55fe7..fa90fed 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -1,58 +1,37 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using DeviceDetectorNET.Parser.Device; -using JiShe.CollectBus.Common.Consts; -using JiShe.CollectBus.Common.Enums; -using JiShe.CollectBus.IotSystems.Devices; -using JiShe.CollectBus.IotSystems.MessageIssueds; -using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; -using JiShe.CollectBus.Protocol.Contracts; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; -using JiShe.CollectBus.Repository.MeterReadingRecord; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; using TouchSocket.Sockets; -using Volo.Abp.Caching; -using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { /// /// 定时抄读任务消息消费订阅 /// - [Route($"/worker/app/subscriber")] public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; - private readonly IServiceProvider _serviceProvider; - private readonly IRepository _deviceRepository; - private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; - + private readonly IIoTDbProvider _dbProvider; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. - /// The Device pepository. - /// The service provider. - public WorkerSubscriberAppService(ILogger logger, + /// IoTDB数据驱动 + public WorkerSubscriberAppService(ILogger logger, ITcpService tcpService, - IRepository deviceRepository, - IMeterReadingRecordRepository meterReadingRecordsRepository, - IServiceProvider serviceProvider) + IIoTDbProvider dbProvider) { _logger = logger; _tcpService = tcpService; - _serviceProvider = serviceProvider; - _deviceRepository = deviceRepository; - _meterReadingRecordsRepository = meterReadingRecordsRepository; + _dbProvider = dbProvider; } @@ -63,28 +42,12 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [HttpPost] - [Route("ammeter/oneminute/issued-event")] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] - //[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] - public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); + return await SendMessagesAsync(receivedMessage); - } - } - return SubscribeAck.Success(); } /// @@ -92,28 +55,11 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [HttpPost] - [Route("ammeter/fiveminute/issued-event")] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] - //[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] - public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); - - } - } - return SubscribeAck.Success(); + return await SendMessagesAsync(receivedMessage); } /// @@ -121,68 +67,68 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [HttpPost] - [Route("ammeter/fifteenminute/issued-event")] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] - //[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] - public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); - try - { - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FirstOrDefaultAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); + return await SendMessagesAsync(receivedMessage); + } - } - } - return SubscribeAck.Success(); - } - catch (Exception ex) - { - - throw ex; - } + /// + /// 电表自动阀控下行消息消费订阅 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)] + public async Task AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage) + { + _logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); + return await SendMessagesAsync(receivedMessage); } #endregion #region 水表消息采集 - + /// /// 水表数据下行消息消费订阅 /// /// /// - [HttpPost] - [Route("watermeter/fifteenminute/issued-event")] [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] - //[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] - public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { - _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【15分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); - } - } - return SubscribeAck.Success(); + return await SendMessagesAsync(receivedMessage); } #endregion + + /// + /// 设备报文发送 + /// + /// 消息记录 + /// + private async Task SendMessagesAsync(MeterReadingTelemetryPacketInfo receivedMessage) + { + try + { + var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress); + if (checkResult) + { + await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); + + receivedMessage.IsSend = true; + await _dbProvider.InsertAsync(receivedMessage); + + return SubscribeAck.Success(); + } + else + { + return SubscribeAck.Fail(); + } + } + catch (Exception) + { + throw; + } + } } } diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Ammeters/AmmeterAutoValveControlSetting.cs b/services/JiShe.CollectBus.Domain/IotSystems/Ammeters/AmmeterAutoValveControlSetting.cs new file mode 100644 index 0000000..0116def --- /dev/null +++ b/services/JiShe.CollectBus.Domain/IotSystems/Ammeters/AmmeterAutoValveControlSetting.cs @@ -0,0 +1,107 @@ +using JiShe.CollectBus.Common.Models; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IotSystems.Ammeters +{ + /// + /// 电表自动阀控制设置实体 + /// + public class AmmeterAutoValveControlSetting: DeviceCacheBasicModel + { + public int ID { get; set; } + + /// + /// 定时阀控名称 + /// + public string Name { get; set; } + + /// + /// 是否禁止: 1 禁止, 0 启用, + /// + public int IsForbid { get; set; } + + /// + /// 阀控命令:on 合闸(通电), off 断闸(断电) + /// + public string TripType { get; set; } + + /// + /// 执行周期:Once 一次、EachDay 每日、EachWeek 每周、EachMonth 每月、 + /// + public string LoopType { get; set; } + + /// + /// (专属字段)执行一次时,指定yyyy-MM-dd 00:00:00 (只用年月日) + /// + public DateTime? OnceWithDate { get; set; } + + /// + /// (专属字段)每日执行时排除(多个用英文逗号分隔),如:"周六,周日" + /// + public string EachDayWithout { get; set; } + + /// + /// (专属字段)每周执行时运行(多个用英文逗号分隔),如:"周一,周二,周日" + /// + public string EachWeekWith { get; set; } + + /// + /// (专属字段)每月执行时运行(多个用英文逗号分隔),如:"1,2,3,5" + /// + public string EachMonthWith { get; set; } + + /// + /// 阀控执行时分:HH:mm:00 + /// + public string TripTime { get; set; } + + /// + /// 项目ID + /// + public int ProjectID { get; set; } + + /// + /// 添加时间 + /// + public DateTime AddDate { get; set; } + + /// + /// 删除改成-1, + /// + public int State { get; set; } + + /// + /// 实际执行时间 + /// + public DateTime? WorkTime { get; set; } + + /// + /// 创建人ID + /// + public int CreateUserID { get; set; } + + /// + /// 采集编号 + /// + public string GatherCode { get; set; } + + /// + /// 集中器地址 + /// + public string Address { get; set; } + + /// + /// 集中器区域代码 + /// + public string AreaCode { get; set; } + + /// + /// 电表通信地址 + /// + public string AmmerterAddress { get; set; } + } +} diff --git a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Ammeters/AmmeterInfo.cs similarity index 99% rename from services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs rename to services/JiShe.CollectBus.Domain/IotSystems/Ammeters/AmmeterInfo.cs index add3131..60d6c91 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Ammeters/AmmeterInfo.cs @@ -6,7 +6,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Ammeters +namespace JiShe.CollectBus.IotSystems.Ammeters { public class AmmeterInfo: DeviceCacheBasicModel { diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index 2033035..07b5369 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -15,7 +15,7 @@ using Volo.Abp.Domain.Entities.Auditing; namespace JiShe.CollectBus.IotSystems.MeterReadingRecords { /// - /// 抄读任务Redis缓存数据记录 + /// 抄读任务数据 /// [EntityType(EntityTypeEnum.TableModel)] public class MeterReadingTelemetryPacketInfo : IoTEntity @@ -56,6 +56,13 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords [FIELDColumn] public int FocusId { get; set; } + + /// + /// 集中器地址 + /// + [FIELDColumn] + public string FocusAddress { get; set; } + /// /// 表Id /// diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs index d51810c..6c4e960 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs @@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// 电表地址 /// 特殊控制码 /// 密码 - /// 是否为开阀 + /// 是否为开阀,true合闸,false关闸 /// 型号码 /// public static List BuildAmmeterValveControlSendDataUnit(string ammeterAddress, string specialControlCode, string password, bool state, string modelCode = "") diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 34cf37f..11ba0a6 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -775,5 +775,61 @@ namespace JiShe.CollectBus.Common.Helpers return makstr;// Convert.ToInt32(makstr) << 32 | msa; } + + /// + /// 判断是生成月 + /// + /// + /// + /// + public static bool JudgeIsGenerate_Month(string eachMonthWith, DateTime curTime) + { + var arr = eachMonthWith.Split(','); + if (arr.Contains(curTime.Day.ToString())) + return true; + return false; + } + + /// + /// 判断是生成一次 + /// + /// + /// + /// + public static bool JudgeIsGenerate_Once(DateTime onceWithDate, DateTime curTime) => curTime.Date.Equals(onceWithDate);//为当天时发送 + + /// + /// 判断是生成日 + /// + /// + /// + /// + public static bool JudgeIsGenerate_Day(string eachDayWithout, DateTime curTime) + { + var weekName = strWeeks[(int)curTime.DayOfWeek]; + var arr = eachDayWithout.Split(','); + return !arr.Contains(weekName); + } + + public static readonly List strWeeks = new List() { "周日", "周一", "周二", "周三", "周四", "周五", "周六" }; + + /// + /// 判断是生成周 + /// + /// + /// + /// + public static bool JudgeIsGenerate_Week(string eachWeekWith, DateTime curTime) + { + if (string.IsNullOrWhiteSpace(eachWeekWith)) + { + return false; + } + + var weekName = strWeeks[(int)curTime.DayOfWeek]; + var arr = eachWeekWith.Split(','); + return arr.Contains(weekName); + + } } } diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs b/shared/JiShe.CollectBus.Common/Models/TasksToBeIssueModel.cs similarity index 93% rename from shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs rename to shared/JiShe.CollectBus.Common/Models/TasksToBeIssueModel.cs index 6420a23..1484ab8 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TasksToBeIssueModel.cs +++ b/shared/JiShe.CollectBus.Common/Models/TasksToBeIssueModel.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Common.BuildSendDatas +namespace JiShe.CollectBus.Common.Models { /// /// 待下发的指令生产任务数据