This commit is contained in:
cli 2025-03-17 08:35:22 +08:00
commit 032c075f2f
5 changed files with 275 additions and 89 deletions

View File

@ -15,19 +15,19 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
#region
@ -35,19 +35,19 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 5分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 15分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage issuedEventMessage);
Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
}
}

View File

@ -1,18 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Workers;
using MassTransit;
using MassTransit.Internals.GraphValidation;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.ScheduledMeterReading
@ -511,76 +517,84 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
foreach (var ammeterInfo in focusInfo.Value)
{
var meter = ammeterInfo.Value;
var ammeter = ammeterInfo.Value;
if (string.IsNullOrWhiteSpace(meter.ItemCodes))
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
_logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,采集项为空,-101");
_logger.LogError($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,采集项为空,-101");
continue;
}
//载波的不处理
if (meter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
if (ammeter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
_logger.LogError($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name}数据采集指令生成失败,载波不处理,-102");
_logger.LogError($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,载波不处理,-102");
continue;
}
if (meter.State.Equals(2))
if (ammeter.State.Equals(2))
{
_logger.LogWarning($"{nameof(CreatePublishTask)} {meter.Name} 集中器{meter.FocusAddress}的电表{meter.Name}状态为禁用,不处理");
_logger.LogWarning($"{nameof(CreatePublishTask)} {ammeter.Name} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}状态为禁用,不处理");
continue;
}
//排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
if (!IsGennerateCmd(meter.LastTime, -1))
if (!IsGennerateCmd(ammeter.LastTime, -1))
{
_logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{meter.FocusAddress}的电表{meter.Name},采集时间:{meter.LastTime}已超过1天未在线不生成指令");
_logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
continue;
}
if (string.IsNullOrWhiteSpace(meter.AreaCode))
if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
{
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信区号为空");
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{ammeter.ID},集中器通信区号为空");
continue;
}
if (string.IsNullOrWhiteSpace(meter.Address))
if (string.IsNullOrWhiteSpace(ammeter.Address))
{
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址为空");
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址为空");
continue;
}
if (Convert.ToInt32(meter.Address) > 65535)
if (Convert.ToInt32(ammeter.Address) > 65535)
{
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},集中器通信地址无效,确保大于65535");
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址无效,确保大于65535");
continue;
}
if (meter.MeteringCode <= 0 || meter.MeteringCode > 2033)
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
{
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{meter.ID},非有效测量点号({meter.MeteringCode})");
_logger.LogError($"{nameof(CreatePublishTask)} 表ID:{ammeter.ID},非有效测量点号({ammeter.MeteringCode})");
continue;
}
List<string> tempCodes = meter.ItemCodes.Deserialize<List<string>>()!;
List<string> tempCodes = ammeter.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (meter.AutomaticReport.Equals(1))
if (ammeter.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
var tempItemCodes = string.Empty;
if (meter.ItemCodes.Contains("0C_49"))
tempItemCodes += "0C_49,";
if (meter.ItemCodes.Contains("0C_149"))
tempItemCodes += "0C_149,";
if (meter.ItemCodes.Contains("10_97"))
tempItemCodes += "10_97";
if (string.IsNullOrWhiteSpace(tempItemCodes))
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeter.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
_logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}自动上报数据主动采集1类数据时数据类型为空");
continue;
}
else
{
meter.ItemCodes = tempItemCodes;
tempCodes = tempSubCodes;
}
}
@ -597,54 +611,47 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
var itemCodeArr = tempItem.Split('_');
var aFN = (AFN)itemCodeArr[0].HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
if(ammeter.AutomaticReport.Equals(1) && aFN == AFN.)
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeter.FocusAddress, ammeter.MeteringCode, (ATypeOfDataItems)fn);
}
else
{
if (TelemetryPacketBuilder.AFNHandlers.TryGetValue(tempItem, out var handler))
{
dataInfos = handler(ammeter.FocusAddress, fn, ammeter.MeteringCode);
}
else
{
throw new InvalidOperationException($"无效编码: {tempItem}");
}
}
//排除已发送日冻结和月冻结采集项配置
//if (!isSendDayFreeze)
meter.ItemCodes = meter.ItemCodes.Replace("0D_3", "").Replace("0D_4", "")
.Replace("0D_161", "").Replace("0D_162", "").Replace("0D_163", "").Replace("0D_164", "")
.Replace("0D_165", "").Replace("0D_166", "").Replace("0D_167", "").Replace("0D_168", "").Replace("0C_149", "");
//if (!isSendMonthFreeze)
meter.ItemCodes = meter.ItemCodes.Replace("0D_177", "").Replace("0D_178", "").Replace("0D_179", "").Replace("0D_180", "")
.Replace("0D_181", "").Replace("0D_181", "").Replace("0D_182", "").Replace("0D_183", "").Replace("0D_184", "")
.Replace("0D_193", "").Replace("0D_195", "");
//TODO:特殊表
//var itemCodeArr = itemCode.Split('_');
//var aFN = (AFN)itemCodeArr[0].HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
//if (aFN == AFN.请求实时数据)
//{
// var bytes = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(address, ammeter.MeterCode.Value, (ATypeOfDataItems)fn);
// bytesList.Add(bytes);
//}
//else if (aFN == AFN.请求历史数据)
//{
// var density = (FreezeDensity)input.Density;
// var bytes = Build3761SendData.BuildAmmeterReadingIIdataTypeItemsSendCmd(address, ammeter.MeterCode.Value, (IIdataTypeItems)fn, density, 0);
// bytesList.Add(bytes);
//}
}
}
string deviceNo = "";
string messageHexString = "";
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
if (dataInfos == null || dataInfos.Length <= 0)
{
//ClientId = client.Id,
//ClientIp = client.IP,
//ClientPort = client.Port,
MessageHexString = messageHexString,
DeviceNo = deviceNo,
_logger.LogWarning($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
var evenMessageInfo = new ScheduledMeterReadingIssuedEventMessage
{
Message = dataInfos!,
DeviceNo = ammeter.FocusAddress,
MessageId = NewId.NextGuid().ToString()
};
await _capBus.PublishAsync(eventName, messageReceivedHeartbeatEvent);
await _capBus.PublishAsync(eventName, evenMessageInfo);
}
}
}
}
/// <summary>

View File

@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Subscribers
[HttpPost]
[Route("ammeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -82,7 +82,7 @@ namespace JiShe.CollectBus.Subscribers
[HttpPost]
[Route("ammeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -109,7 +109,7 @@ namespace JiShe.CollectBus.Subscribers
[HttpPost]
[Route("ammeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -138,7 +138,7 @@ namespace JiShe.CollectBus.Subscribers
[HttpPost]
[Route("watermeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -165,7 +165,7 @@ namespace JiShe.CollectBus.Subscribers
[HttpPost]
[Route("watermeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -192,7 +192,7 @@ namespace JiShe.CollectBus.Subscribers
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(IssuedEventMessage receivedMessage)
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");

View File

@ -0,0 +1,144 @@
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Data.SqlTypes;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.BuildSendDatas
{
/// <summary>
/// 构建下发报文
/// </summary>
public static class TelemetryPacketBuilder
{
/// <summary>
/// 构建报文的委托
/// </summary>
/// <param name="address"></param>
/// <param name="fn"></param>
/// <param name="pn"></param>
public delegate byte[] AFNDelegate(string address, int fn, int pn = 0);
/// <summary>
/// 编码与方法的映射表
/// </summary>
public static readonly Dictionary<string, AFNDelegate> AFNHandlers = new();
static TelemetryPacketBuilder()
{
// 初始化时自动注册所有符合命名规则的方法
var methods = typeof(TelemetryPacketBuilder).GetMethods(BindingFlags.Static | BindingFlags.Public);
foreach (var method in methods)
{
if (method.Name.StartsWith("AFN") && method.Name.EndsWith("_Send"))
{
// 提取编码部分(例如 "AFN0D_F184_Send" -> "0D_184"
string code = method.Name[3..^5].Replace("F", "_"); // 移除前缀和后缀替换F为_
var delegateInstance = (AFNDelegate)Delegate.CreateDelegate(typeof(AFNDelegate), method);
AFNHandlers[code] = delegateInstance;
}
}
}
#region AFN_00H
public static byte[] AFN00_F1_Send(string address,int fn,int pn = 0)
{
var reqParameter = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CMasterStationFunCode.2,
A = address,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = 0,
},
MSA = Build3761SendData.GetMSA(address),
Pn = pn,
Fn = fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
}
public static byte[] AFN00_F3_Send(string address, int fn, int pn = 0)
{
var reqParameter = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CMasterStationFunCode.2,
A = address,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = 0,
},
MSA = Build3761SendData.GetMSA(address),
Pn = pn,
Fn = fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
}
#endregion
#region AFN_01H
public static byte[] AFN01_F1_Send(string address, int fn, int pn = 0)
{
var reqParameter = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CMasterStationFunCode.2,
A = address,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = 0,
},
MSA = Build3761SendData.GetMSA(address),
Pn = pn,
Fn = fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
}
#endregion
#region AFN_02H
public static byte[] AFN02_F2_Send(string address, int fn, int pn = 0)
{
var reqParameter = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CMasterStationFunCode.2,
A = address,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = 0,
},
MSA = Build3761SendData.GetMSA(address),
Pn = pn,
Fn = fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
return bytes;
}
#endregion
}
}

View File

@ -0,0 +1,35 @@
using JiShe.CollectBus.Common.Enums;
namespace JiShe.CollectBus.Common.Models
{
/// <summary>
/// 定时抄读Kafka消息实体1分钟、5分钟、15分钟
/// </summary>
public class ScheduledMeterReadingIssuedEventMessage
{
/// <summary>
/// 消息接收客户端Id
/// </summary>
public string ClientId { get; set; }
/// <summary>
/// 消息内容
/// </summary>
public byte[] Message { get; set; }
/// <summary>
/// 集中器编号
/// </summary>
public string DeviceNo { get; set; }
///// <summary>
///// 采集时间间隔(分钟如15)
///// </summary>
//public int TimeDensity { get; set; }
/// <summary>
/// 消息Id
/// </summary>
public string MessageId { get; set; }
}
}