JiShe.CollectBus/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
2025-05-15 11:23:08 +08:00

541 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Models;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson.Serialization.IdGenerators;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Guids;
namespace JiShe.CollectBus.ScheduledMeterReading
{
/// <summary>
/// 能耗系统定时采集服务
/// </summary>
[AllowAnonymous]
//[Route($"/energy/app/scheduled")]
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{
string systemType = string.Empty;
string serverTagName = string.Empty;
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IProtocolService _protocolService;
private readonly IGuidGenerator _guidGenerator;
public EnergySystemScheduledMeterReadingService(
ILogger<EnergySystemScheduledMeterReadingService> logger,
IDataChannelManageService dataChannelManage,
IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider,
IProtocolService protocolService,
IGuidGenerator guidGenerator,
IOptions<DataMigrationOptions> dataMigrationOptions,
IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions)
: base(logger,
dataChannelManage,
redisDataCacheService,
dbProvider,
protocolService,
guidGenerator,
dataMigrationOptions,
kafkaOptions,
applicationOptions)
{
serverTagName = applicationOptions.Value.ServerTagName;
systemType = applicationOptions.Value.SystemType;
_logger = logger;
_protocolService = protocolService;
_guidGenerator = guidGenerator;
}
public sealed override string SystemType => systemType;
public sealed override string ServerTagName => serverTagName;
/// <summary>
/// 获取采集项列表
/// </summary>
/// <returns></returns>
public override async Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
{
try
{
string sql = $"SELECT DataType,ItemCode FROM TB_GatherItem(NOLOCK) WHERE [State]=0";
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<GatherItemInfo>(sql, null);
}
catch
{
return null;
}
}
/// <summary>
/// 获取电表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
//[HttpGet]
//[Route($"ammeter/list")]
public override async Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
#if DEBUG
var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus109:DeviceInfo";
List<DeviceInfo> ammeterInfos = FreeRedisProvider.Instance.Get<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);
if (ammeterInfos == null || ammeterInfos.Count <= 0)
{
ammeterInfos = new List<DeviceInfo>();
ammeterInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "442400040",
Name = "保利单箱电表1",
FocusId = 95780,
DatabaseBusiID = 1,
MeteringCode = 0,
MeterAddress = "442405000040",
MeterId = 127035,
TypeName = 1,
DataTypes = "581,589,592,597,601",
TimeDensity = 15,
BrandType = "DTS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
MeteringPort = MeteringPortConst.MeteringPortTwo,
Password = "000000",
});
ammeterInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "442400039",
Name = "保利单箱电表2",
FocusId = 69280,
DatabaseBusiID = 1,
MeteringCode = 0,
MeterAddress = "442405000039",
MeterId = 95594,
TypeName = 1,
DataTypes = "581,589,592,597,601",
TimeDensity = 15,
BrandType = "DTS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
MeteringPort = MeteringPortConst.MeteringPortTwo,
Password = "000000",
});
ammeterInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "402440506",
Name = "中环半导体9#冷却泵-220KW(三相电表)",
FocusId = 106857,
DatabaseBusiID = 1,
MeteringCode = 0,
MeterAddress = "402410040506",
MeterId = 139059,
TypeName = 3,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
BrandType = "DTS1980",
Password = "000000",
ProjectID = 1,
MeterType = MeterTypeEnum.Ammeter,
MeteringPort = MeteringPortConst.MeteringPortTwo,
});
FreeRedisProvider.Instance.Set(redisCacheDeviceInfoHashKeyTemp,ammeterInfos);
}
return ammeterInfos;
#else
try
{
string sql = $@"SELECT
C.ID as MeterId,
C.Name,
C.FocusID as FocusId,
C.SingleRate,
C.MeteringCode,
C.Code AS BrandType,
C.Baudrate,
C.Password,
C.MeteringPort,
C.[Address] AS MeterAddress,
C.TypeName,
C.Protocol,
C.TripState,
C.[State],
B.[Address],
B.AreaCode,
B.AutomaticReport,
D.DataTypes,
B.TimeDensity,
A.GatherCode,
C.Special,
C.[ProjectID],
B.AbnormalState,
B.LastTime,
1 as MeterType,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<DeviceInfo>(sql);
}
catch (Exception ex)
{
throw ex;
}
#endif
}
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{
try
{
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
FROM TB_AutoTripTask(nolock) AS A
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' ";
if (!string.IsNullOrWhiteSpace(currentTime))
{
sql = $@"{sql} AND A.TripTime = '{currentTime}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<AmmeterAutoValveControlSetting>(sql);
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
public override async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutoValveControl()
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm}";
try
{
#if DEBUG
//电表自动阀控缓存
string redisCacheDeviceSettingInfoHashKey = $"redisCacheDeviceSettingInfoHashKey_{SystemType}_{ServerTagName}";
var settingInfos = FreeRedisProvider.Instance.Get< List<AmmeterAutoValveControlSetting>> (redisCacheDeviceSettingInfoHashKey);
if (settingInfos == null || settingInfos.Count <= 0)
{
settingInfos = new List<AmmeterAutoValveControlSetting>();
//settingInfos.Add(new AmmeterAutoValveControlSetting()
//{
// MeterType = MeterTypeEnum.Ammeter,
// AmmerterAddress = "442405000040",
// FocusAddress = "442400040",
// FocusId = 95780,
// ProjectID = 1,
// TripType = "on",
// TripTime = $"{DateTime.Now:HH:mm}",
// MeterId = 127035,
// LoopType = "EachDay",
// EachDayWithout = "周六,周日",
// TimeDensity = 15,
//});
//settingInfos.Add(new AmmeterAutoValveControlSetting()
//{
// MeterType = MeterTypeEnum.Ammeter,
// AmmerterAddress = "442405000039",
// FocusAddress = "442400039",
// FocusId = 69280,
// ProjectID = 1,
// TripType = "on",
// TripTime = $"{DateTime.Now:HH:mm}",
// MeterId = 95594,
// LoopType = "EachDay",
// EachDayWithout = "周六,周日",
// TimeDensity = 15,
//});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "402410040506",
FocusAddress = "402440506",
FocusId = 106857,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 139059,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
FreeRedisProvider.Instance.Set(redisCacheDeviceSettingInfoHashKey, settingInfos);
}
List<DeviceInfo> meterInfos =await GetAmmeterInfoList();
#else
//获取电表阀控配置
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
if (settingInfos == null || settingInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
return null;
}
//设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
Dictionary<string, List<DeviceInfo>> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll<List<DeviceInfo>>(redisCacheDeviceInfoHashKey);
List<DeviceInfo> meterInfos = new List<DeviceInfo>();
List<string> focusAddressDataLista = new List<string>();
foreach (var item in keyValuePairsTemps)
{
foreach (var subItem in item.Value)
{
if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
{
meterInfos.Add(subItem);
focusAddressDataLista.Add(subItem.MeterId.ToString());
}
}
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var settingInfo in settingInfos)
{
bool isGenerate = false;
switch (settingInfo.LoopType)
{
case "Once":
isGenerate = CommonHelper.JudgeIsGenerate_Once(settingInfo.OnceWithDate.Value, currentTime);
break;
case "EachDay":
isGenerate = CommonHelper.JudgeIsGenerate_Day(settingInfo.EachDayWithout, currentTime);
break;
case "EachWeek":
isGenerate = CommonHelper.JudgeIsGenerate_Week(settingInfo.EachWeekWith, currentTime);
break;
case "EachMonth":
isGenerate = CommonHelper.JudgeIsGenerate_Month(settingInfo.EachMonthWith, currentTime);
break;
}
if (!isGenerate)//不生成,跳入下一循环
{
continue;
}
//获取对应的缓存电表信息
var ammeterInfo = meterInfos.Where(d=>d.MeterId == settingInfo.MeterId).FirstOrDefault();
if (ammeterInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时未找到对应电表信息电表Id={settingInfo.MeterId}, -102");
continue;
}
bool tripStateResult = false;
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = string.Empty;
if (settingInfo.TripType.Equals("on"))//当前电表断闸,需要合闸
{
ammeterInfo.TripState = 0;
tripStateResult = true;
subItemCode = T6452007PacketItemCodeConst.C1C011C;
if (ammeterInfo.TypeName > 3)
{
subItemCode = T6452007PacketItemCodeConst.C1C011B;
}
}
else if (settingInfo.TripType.Equals("off"))//当前电表合闸,需要断闸
{
ammeterInfo.TripState = 1;
tripStateResult = false;
subItemCode = T6452007PacketItemCodeConst.C1C011A;
}
else
{
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
continue;
}
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password,
ItemCode = subItemCode,
Baudrate = ammeterInfo.Baudrate,
MeteringPort = ammeterInfo.MeteringPort
}
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: subItemCode,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterAutoValveControl,
_guidGenerator);
taskList.Add(meterReadingRecords);
}
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null;
}
return taskList;
//todo 阀控记录入库,推送到新的服务
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 获取水表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
//[HttpGet]
//[Route($"ammeter/list")]
public override async Task<List<DeviceInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
{
try
{
string sql = $@"SELECT
A.ID as MeterId,
A.Name,
A.FocusID as FocusId,
A.MeteringCode,
A.Baudrate,
A.MeteringPort,
A.[Address] AS MeterAddress,
A.[Password],
A.TypeName,
A.Protocol,
A.Code,
A.LinkType,
A.HaveValve,
A.MeterType AS MeterTypeName,
A.MeterBrand,
A.TimesRate,
A.TimeDensity,
A.TripState,
B.[Address],
B.AreaCode,
B.AutomaticReport,
A.[State],
C.GatherCode,
A.[ProjectID],
B.AbnormalState,
B.LastTime,
2 as MeterType,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID
FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A
INNER JOIN [dbo].[TB_FocusInfo](NOLOCK) AS B ON A.FocusID=B.ID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID
WHERE A.State>=0 AND A.State<100 ";
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND C.GatherCode= '{gatherCode}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<DeviceInfo>(sql);
}
catch (Exception)
{
throw;
}
}
}
}