JiShe.CollectBus/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
2025-05-14 15:20:26 +08:00

538 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Models;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson.Serialization.IdGenerators;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Guids;
namespace JiShe.CollectBus.ScheduledMeterReading
{
/// <summary>
/// 能耗系统定时采集服务
/// </summary>
[AllowAnonymous]
//[Route($"/energy/app/scheduled")]
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{
string systemType = string.Empty;
string serverTagName = string.Empty;
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IProtocolService _protocolService;
private readonly IGuidGenerator _guidGenerator;
public EnergySystemScheduledMeterReadingService(
ILogger<EnergySystemScheduledMeterReadingService> logger,
IDataChannelManageService dataChannelManage,
IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider,
IProtocolService protocolService,
IGuidGenerator guidGenerator,
IOptions<DataMigrationOptions> dataMigrationOptions,
IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions)
: base(logger,
dataChannelManage,
redisDataCacheService,
dbProvider,
protocolService,
guidGenerator,
dataMigrationOptions,
kafkaOptions,
applicationOptions)
{
serverTagName = applicationOptions.Value.ServerTagName;
systemType = applicationOptions.Value.SystemType;
_logger = logger;
_protocolService = protocolService;
_guidGenerator = guidGenerator;
}
public sealed override string SystemType => systemType;
public sealed override string ServerTagName => serverTagName;
/// <summary>
/// 获取采集项列表
/// </summary>
/// <returns></returns>
public override async Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
{
try
{
string sql = $"SELECT DataType,ItemCode FROM TB_GatherItem(NOLOCK) WHERE [State]=0";
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<GatherItemInfo>(sql, null);
}
catch
{
return null;
}
}
/// <summary>
/// 获取电表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
//[HttpGet]
//[Route($"ammeter/list")]
public override async Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890")
{
//442400040
//442400039
List<DeviceInfo> ammeterInfos = new List<DeviceInfo>();
ammeterInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "442400040",
Name = "保利单箱电表1",
FocusId = 95780,
DatabaseBusiID = 1,
MeteringCode = 1,
MeterAddress = "442405000040",
MeterId = 127035,
TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
BrandType = "DDS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
});
ammeterInfos.Add(new DeviceInfo()
{
Baudrate = 2400,
FocusAddress = "442400039",
Name = "保利单箱电表2",
FocusId = 69280,
DatabaseBusiID = 1,
MeteringCode = 1,
MeterAddress = "442405000039",
MeterId = 95594,
TypeName = 1,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
BrandType = "DDS1980",
MeterType = MeterTypeEnum.Ammeter,
ProjectID = 1,
});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "402440506",
// Name = "张家祠工务(三相电表)",
// FocusId = 95780,
// DatabaseBusiID = 1,
// MeteringCode = 1,
// MeterAddress = "402410040506",
// MeterId = 127035,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
// BrandType = "DDS1980",
//});
//ammeterInfos.Add(new DeviceInfo()
//{
// Baudrate = 2400,
// FocusAddress = "542400504",
// Name = "五号配(长芦二所四排)(单相电表)",
// FocusId = 69280,
// DatabaseBusiID = 1,
// MeteringCode = 2,
// MeterAddress = "542410000504",
// MeterId = 95594,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
// BrandType = "DDS1980",
//});
return ammeterInfos;
try
{
string sql = $@"SELECT
C.ID as MeterId,
C.Name,
C.FocusID as FocusId,
C.SingleRate,
C.MeteringCode,
C.Code AS BrandType,
C.Baudrate,
C.Password,
C.MeteringPort,
C.[Address] AS MeterAddress,
C.TypeName,
C.Protocol,
C.TripState,
C.[State],
B.[Address],
B.AreaCode,
B.AutomaticReport,
D.DataTypes,
B.TimeDensity,
A.GatherCode,
C.Special,
C.[ProjectID],
B.AbnormalState,
B.LastTime,
1 as MeterType,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<DeviceInfo>(sql);
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public override async Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{
try
{
string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId
FROM TB_AutoTripTask(nolock) AS A
INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID
INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID
INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID
INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID
WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' ";
if (!string.IsNullOrWhiteSpace(currentTime))
{
sql = $@"{sql} AND A.TripTime = '{currentTime}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<AmmeterAutoValveControlSetting>(sql);
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
public override async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutoValveControl()
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm}";
try
{
#if DEBUG
var settingInfos = new List<AmmeterAutoValveControlSetting>();
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000040",
FocusAddress = "442400040",
FocusId = 95780,
ProjectID = 1,
TripType = "off",
TripTime = "14:02",
MeterId = 127035,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000039",
FocusAddress = "442400039",
FocusId = 69280,
ProjectID = 1,
TripType = "off",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 95594,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "321410010270",
FocusAddress = "322011149",
FocusId = 333333,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 333333,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "112410027787",
FocusAddress = "322011149",
FocusId = 222222,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 222222,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
#else
//获取电表阀控配置
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
if (settingInfos == null || settingInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101");
return null;
}
#endif
//设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
Dictionary<string, List<DeviceInfo>> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll<List<DeviceInfo>>(redisCacheDeviceInfoHashKey);
List<DeviceInfo> meterInfos = new List<DeviceInfo>();
List<string> focusAddressDataLista = new List<string>();
foreach (var item in keyValuePairsTemps)
{
foreach (var subItem in item.Value)
{
if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
{
meterInfos.Add(subItem);
focusAddressDataLista.Add(subItem.MeterId.ToString());
}
}
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var settingInfo in settingInfos)
{
bool isGenerate = false;
switch (settingInfo.LoopType)
{
case "Once":
isGenerate = CommonHelper.JudgeIsGenerate_Once(settingInfo.OnceWithDate.Value, currentTime);
break;
case "EachDay":
isGenerate = CommonHelper.JudgeIsGenerate_Day(settingInfo.EachDayWithout, currentTime);
break;
case "EachWeek":
isGenerate = CommonHelper.JudgeIsGenerate_Week(settingInfo.EachWeekWith, currentTime);
break;
case "EachMonth":
isGenerate = CommonHelper.JudgeIsGenerate_Month(settingInfo.EachMonthWith, currentTime);
break;
}
if (!isGenerate)//不生成,跳入下一循环
{
continue;
}
//获取对应的缓存电表信息
var ammeterInfo = meterInfos.Where(d=>d.MeterId == settingInfo.MeterId).FirstOrDefault();
if (ammeterInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时未找到对应电表信息电表Id={settingInfo.MeterId}, -102");
continue;
}
bool tripStateResult = false;
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = string.Empty;
if (settingInfo.TripType.Equals("on"))
{
ammeterInfo.TripState = 0;
tripStateResult = true;
subItemCode = T6452007PacketItemCodeConst.C1C01C;
if (ammeterInfo.TypeName != 1)
{
subItemCode = T6452007PacketItemCodeConst.C1C01B;
}
}
else if (settingInfo.TripType.Equals("off"))
{
ammeterInfo.TripState = 1;
tripStateResult = false;
subItemCode = T6452007PacketItemCodeConst.C1C01A;
}
else
{
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
continue;
}
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password,
ItemCode = subItemCode,
}
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: subItemCode,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterAutoValveControl,
_guidGenerator);
taskList.Add(meterReadingRecords);
}
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null;
}
return taskList;
//todo 阀控记录入库,推送到新的服务
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 获取水表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
//[HttpGet]
//[Route($"ammeter/list")]
public override async Task<List<DeviceInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
{
try
{
string sql = $@"SELECT
A.ID as MeterId,
A.Name,
A.FocusID as FocusId,
A.MeteringCode,
A.Baudrate,
A.MeteringPort,
A.[Address] AS MeterAddress,
A.[Password],
A.TypeName,
A.Protocol,
A.Code,
A.LinkType,
A.HaveValve,
A.MeterType AS MeterTypeName,
A.MeterBrand,
A.TimesRate,
A.TimeDensity,
A.TripState,
B.[Address],
B.AreaCode,
B.AutomaticReport,
A.[State],
C.GatherCode,
A.[ProjectID],
B.AbnormalState,
B.LastTime,
2 as MeterType,
CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,
(select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID
FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A
INNER JOIN [dbo].[TB_FocusInfo](NOLOCK) AS B ON A.FocusID=B.ID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID
WHERE A.State>=0 AND A.State<100 ";
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND C.GatherCode= '{gatherCode}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<DeviceInfo>(sql);
}
catch (Exception)
{
throw;
}
}
}
}