371 lines
16 KiB
C#
371 lines
16 KiB
C#
using JiShe.CollectBus.Application.Contracts;
|
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
|
using JiShe.CollectBus.Common.Consts;
|
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
|
using JiShe.CollectBus.Common.Encrypt;
|
|
using JiShe.CollectBus.Common.Enums;
|
|
using JiShe.CollectBus.Common.Extensions;
|
|
using JiShe.CollectBus.Common.Helpers;
|
|
using JiShe.CollectBus.FreeSql;
|
|
using JiShe.CollectBus.GatherItem;
|
|
using JiShe.CollectBus.IoTDB.Context;
|
|
using JiShe.CollectBus.IoTDB.Interface;
|
|
using JiShe.CollectBus.IotSystems.Ammeters;
|
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
|
using JiShe.CollectBus.Kafka.Internal;
|
|
using JiShe.CollectBus.Kafka.Producer;
|
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
|
using JiShe.CollectBus.Protocol.Contracts.SendData;
|
|
using Microsoft.AspNetCore.Authorization;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
|
{
|
|
/// <summary>
|
|
/// 能耗系统定时采集服务
|
|
/// </summary>
|
|
[AllowAnonymous]
|
|
//[Route($"/energy/app/scheduled")]
|
|
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
|
|
{
|
|
string serverTagName = string.Empty;
|
|
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
|
private readonly IIoTDbProvider _dbProvider;
|
|
private readonly IProtocolService _protocolService;
|
|
|
|
public EnergySystemScheduledMeterReadingService(
|
|
ILogger<EnergySystemScheduledMeterReadingService> logger,
|
|
IIoTDbProvider dbProvider,
|
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
|
IoTDBRuntimeContext runtimeContext,
|
|
IProducerService producerService,
|
|
IProtocolService protocolService,
|
|
IRedisDataCacheService redisDataCacheService)
|
|
: base(logger,
|
|
producerService,
|
|
redisDataCacheService,
|
|
dbProvider,
|
|
runtimeContext,
|
|
protocolService,
|
|
kafkaOptions)
|
|
{
|
|
serverTagName = kafkaOptions.Value.ServerTagName;
|
|
_dbProvider = dbProvider;
|
|
_logger = logger;
|
|
_protocolService = protocolService;
|
|
}
|
|
|
|
public sealed override string SystemType => SystemTypeConst.Energy;
|
|
|
|
public sealed override string ServerTagName => serverTagName;
|
|
|
|
/// <summary>
|
|
/// 获取采集项列表
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public override async Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
|
|
{
|
|
try
|
|
{
|
|
string sql = $"SELECT DataType,ItemCode FROM TB_GatherItem(NOLOCK) WHERE [State]=0";
|
|
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
|
.Ado
|
|
.QueryAsync<GatherItemInfo>(sql, null);
|
|
}
|
|
catch
|
|
{
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 获取电表信息
|
|
/// </summary>
|
|
/// <param name="gatherCode">采集端Code</param>
|
|
/// <returns></returns>
|
|
//[HttpGet]
|
|
//[Route($"ammeter/list")]
|
|
public override async Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
|
|
{
|
|
|
|
//List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
|
|
//ammeterInfos.Add(new AmmeterInfo()
|
|
//{
|
|
// Baudrate = 2400,
|
|
// FocusAddress = "402440506",
|
|
// Name = "张家祠工务(三相电表)",
|
|
// FocusId = 95780,
|
|
// DatabaseBusiID = 1,
|
|
// MeteringCode = 1,
|
|
// AmmerterAddress = "402410040506",
|
|
// MeterId = 127035,
|
|
// TypeName = 3,
|
|
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
|
|
// TimeDensity = 15,
|
|
// BrandType = "",
|
|
//});
|
|
//ammeterInfos.Add(new AmmeterInfo()
|
|
//{
|
|
// Baudrate = 2400,
|
|
// FocusAddress = "542400504",
|
|
// Name = "五号配(长芦二所四排)(单相电表)",
|
|
// FocusId = 69280,
|
|
// DatabaseBusiID = 1,
|
|
// MeteringCode = 2,
|
|
// AmmerterAddress = "542410000504",
|
|
// MeterId = 95594,
|
|
// TypeName = 1,
|
|
// DataTypes = "581,589,592,597,601",
|
|
// TimeDensity = 15,
|
|
// BrandType = "",
|
|
//});
|
|
|
|
//return ammeterInfos;
|
|
|
|
string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
|
|
FROM TB_GatherInfo(NOLOCK) AS A
|
|
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
|
|
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
|
|
INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
|
|
WHERE 1=1 and C.Special = 0 ";
|
|
//TODO 记得移除特殊表过滤
|
|
|
|
if (!string.IsNullOrWhiteSpace(gatherCode))
|
|
{
|
|
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
|
|
}
|
|
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
|
.Ado
|
|
.QueryAsync<AmmeterInfo>(sql);
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// 获取电表阀控配置
|
|
/// </summary>
|
|
/// <param name="currentTime">阀控的时间</param>
|
|
/// <returns></returns>
|
|
public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
|
|
{
|
|
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
|
|
FROM TB_AutoTripTask(nolock) AS A
|
|
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
|
|
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
|
|
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
|
|
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
|
|
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime";
|
|
|
|
if (!string.IsNullOrWhiteSpace(currentTime))
|
|
{
|
|
sql = $@"{sql} AND A.TripTime = '{currentTime}'";
|
|
}
|
|
|
|
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
|
.Ado
|
|
.QueryAsync<AmmeterAutoValveControlSetting>(sql);
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// 电表自动阀控
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public override async Task AmmeterScheduledAutoValveControl()
|
|
{
|
|
var currentTime = DateTime.Now;
|
|
string currentTimeStr = $"{currentTime:HH:mm}";
|
|
|
|
try
|
|
{
|
|
//获取电表阀控配置
|
|
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
|
|
|
|
if (settingInfos == null || settingInfos.Count <= 0)
|
|
{
|
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
|
|
return;
|
|
}
|
|
|
|
//批量获取对应的缓存电表信息
|
|
var ammeterInfos = new List<AmmeterInfo>();
|
|
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
|
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
|
|
|
foreach (var settingInfo in settingInfos)
|
|
{
|
|
bool isGenerate = false;
|
|
switch (settingInfo.LoopType)
|
|
{
|
|
case "Once":
|
|
isGenerate = CommonHelper.JudgeIsGenerate_Once(settingInfo.OnceWithDate.Value, currentTime);
|
|
break;
|
|
case "EachDay":
|
|
isGenerate = CommonHelper.JudgeIsGenerate_Day(settingInfo.EachDayWithout, currentTime);
|
|
break;
|
|
case "EachWeek":
|
|
isGenerate = CommonHelper.JudgeIsGenerate_Week(settingInfo.EachWeekWith, currentTime);
|
|
break;
|
|
case "EachMonth":
|
|
isGenerate = CommonHelper.JudgeIsGenerate_Month(settingInfo.EachMonthWith, currentTime);
|
|
break;
|
|
}
|
|
|
|
if (!isGenerate)//不生成,跳入下一循环
|
|
{
|
|
continue;
|
|
}
|
|
|
|
//获取对应的缓存电表信息
|
|
var ammeterInfo = ammeterInfos.First();
|
|
bool tripStateResult = false;
|
|
if (settingInfo.TripType.Equals("on"))
|
|
{
|
|
ammeterInfo.TripState = 0;
|
|
tripStateResult = true;
|
|
}
|
|
else if (settingInfo.TripType.Equals("off"))
|
|
{
|
|
ammeterInfo.TripState = 1;
|
|
tripStateResult = false;
|
|
|
|
}
|
|
else
|
|
{
|
|
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
|
|
continue;
|
|
}
|
|
|
|
var temCode = "10_01_";
|
|
|
|
//根据电表型号获取协议插件
|
|
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
|
|
if (protocolPlugin == null)
|
|
{
|
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
|
|
return;
|
|
}
|
|
|
|
BuildResponse builderResponse = await protocolPlugin.BuildAsync(new BuildRequest()
|
|
{
|
|
FocusAddress = ammeterInfo.FocusAddress,
|
|
Pn = ammeterInfo.MeteringCode,
|
|
ItemCode = temCode,
|
|
});
|
|
|
|
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA,builderResponse.Seq);
|
|
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
|
{
|
|
SystemName = SystemType,
|
|
ProjectId = $"{ammeterInfo.ProjectID}",
|
|
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
|
DeviceId = $"{ammeterInfo.MeterId}",
|
|
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
|
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
|
PendingCopyReadTime = currentTime,
|
|
CreationTime = currentTime,
|
|
MeterAddress = ammeterInfo.AmmerterAddress,
|
|
AFN = builderResponse.AFn,
|
|
Fn = builderResponse.Fn,
|
|
Seq = builderResponse.Seq,
|
|
MSA = builderResponse.MSA,
|
|
ItemCode = temCode,
|
|
TaskMark = taskMark,
|
|
IsSend = false,
|
|
ManualOrNot = false,
|
|
Pn = ammeterInfo.MeteringCode,
|
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
|
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
|
IsReceived = false,
|
|
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
|
};
|
|
taskList.Add(meterReadingRecords);
|
|
}
|
|
if (taskList == null || taskList.Count <= 0)
|
|
{
|
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
|
|
return;
|
|
}
|
|
|
|
//任务记录入库
|
|
_ = _dbProvider.BatchInsertAsync(metadata, taskList);
|
|
|
|
//任务信息推送Kafka
|
|
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
|
items: taskList,
|
|
deviceIdSelector: data => data.DeviceId,
|
|
processor: (data, groupIndex) =>
|
|
{
|
|
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, data, groupIndex);
|
|
}
|
|
);
|
|
|
|
//todo 阀控记录入库,推送到新的服务
|
|
}
|
|
catch (Exception)
|
|
{
|
|
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 获取水表信息
|
|
/// </summary>
|
|
/// <param name="gatherCode">采集端Code</param>
|
|
/// <returns></returns>
|
|
//[HttpGet]
|
|
//[Route($"ammeter/list")]
|
|
public override async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
|
|
{
|
|
string sql = $@"SELECT
|
|
A.ID as MeterId,
|
|
A.Name,
|
|
A.FocusID as FocusId,
|
|
A.MeteringCode,
|
|
A.Baudrate,
|
|
A.MeteringPort,
|
|
A.[Address] AS MeterAddress,
|
|
A.[Password],
|
|
A.TypeName,
|
|
A.Protocol,
|
|
A.Code,
|
|
A.LinkType,
|
|
A.HaveValve,
|
|
A.MeterType AS MeterTypeName,
|
|
A.MeterBrand,
|
|
A.TimesRate,
|
|
A.TimeDensity,
|
|
A.TripState,
|
|
B.[Address],
|
|
B.AreaCode,
|
|
B.AutomaticReport,
|
|
A.[State],
|
|
C.GatherCode,
|
|
A.[ProjectID],
|
|
B.AbnormalState,
|
|
B.LastTime,
|
|
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
|
|
(select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID
|
|
FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A
|
|
INNER JOIN [dbo].[TB_FocusInfo](NOLOCK) AS B ON A.FocusID=B.ID AND B.RemoveState >= 0 AND B.State>=0
|
|
INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID
|
|
WHERE A.State>=0 AND A.State<100 ";
|
|
|
|
if (!string.IsNullOrWhiteSpace(gatherCode))
|
|
{
|
|
sql = $@"{sql} AND C.GatherCode= '{gatherCode}'";
|
|
}
|
|
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
|
|
.Ado
|
|
.QueryAsync<WatermeterInfo>(sql);
|
|
}
|
|
}
|
|
} |