Compare commits

..

No commits in common. "a5cd6b86f2791faaafdf34c80f9620754779df86" and "9104cda58abfa7bab7387fab86f0e42b76d044bd" have entirely different histories.

88 changed files with 2705 additions and 869 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,405 @@
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Enums;
using System;
using System.Reflection;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
public abstract class T37612012ProtocolPlugin : IProtocolPlugin
{
//头部字节长度
public const int hearderLen = 6;
public const int tPLen = 6;
public const string errorData = "EE";
private readonly ILogger _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
private readonly IFreeRedisProvider _redisProvider;
public T37612012ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger)
{
_logger = logger;
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_redisProvider = serviceProvider.GetRequiredService<IFreeRedisProvider>();
}
public abstract ProtocolInfo Info { get; }
public virtual async Task<ProtocolInfo> GetAsync() => await Task.FromResult(Info);
public virtual async Task LoadAsync()
{
if (Info == null)
{
throw new ArgumentNullException(nameof(Info));
}
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
await _protocolInfoRepository.InsertAsync(Info);
await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
}
public abstract Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? receivedAction = null) where T : class;
/// <summary>
/// 解析376.1帧
/// </summary>
/// <param name="messageReceived"></param>
/// <returns></returns>
public virtual TB3761? Analysis3761(string messageReceived)
{
try
{
var hexStringList = messageReceived.StringToPairs();
// 初步校验
if (hexStringList.Count < 6 || hexStringList.FirstOrDefault() != "68" || hexStringList.Skip(5).Take(1).FirstOrDefault() != "68" || hexStringList.Count < 18 || hexStringList.LastOrDefault() != "16")
{
_logger.LogError($"解析Analysis3761校验不通过,报文:{messageReceived}");
}
else
{
TB3761 tB3761 = new TB3761
{
BaseHexMessage = new BaseHexMessage
{
HexMessageString = messageReceived,
HexMessageList = hexStringList
},
C = Analysis_C(hexStringList),
A = Analysis_A(hexStringList),
AFN_FC = Analysis_AFN_FC(hexStringList),
SEQ = Analysis_SEQ(hexStringList),
UnitData = Analysis_UnitData(hexStringList),
DA = Analysis_DA(hexStringList),
DT = Analysis_DT(hexStringList)
};
return tB3761;
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis3761错误,报文:{messageReceived},异常:{ex.Message}");
}
return null;
}
/// <summary>
/// 控制域C解析
/// </summary>
/// <returns></returns>
public virtual C Analysis_C(List<string> hexStringList)
{
C c = new C();
try
{
if (hexStringList.Count > 6)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
{
HexMessageList = hexStringList.GetRange(6, 1) // 控制域 1字节
};
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
if (baseHexMessage.HexMessageList.Count == 0)
return null;
string binStr = baseHexMessage.HexMessageString.HexTo4BinZero();
c = new C
{
BaseHexMessage = baseHexMessage,
FC = binStr.Substring(binStr.Length - 4, 4).BinToDec(),
FCV = binStr.Substring(3, 1).BinToDec(),
FCB = binStr.Substring(2, 1).BinToDec(),
PRM = binStr.Substring(1, 1).BinToDec(),
DIR = binStr.Substring(0, 1).BinToDec()
};
return c;
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_C错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}");
}
return c;
}
/// <summary>
/// 地址域A解析
/// </summary>
/// <param name="hexStringList"></param>
/// <returns></returns>
public virtual A Analysis_A(List<string> hexStringList)
{
A a = new A();
try
{
if (hexStringList.Count > 7)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
{
HexMessageList = hexStringList.GetRange(7, 5) // 地址域 5个字节
};
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
a = new A
{
BaseHexMessage = baseHexMessage,
A1 = baseHexMessage.HexMessageList.ListReverseToStr(0, 2),//.DataConvert(10);//行政区划码A1
A2 = baseHexMessage.HexMessageList.ListReverseToStr(2, 2).PadLeft(5, '0').HexToDec(),//终端地址A2
A3 = Analysis_A3(baseHexMessage.HexMessageList) //主站地址和组地址标志A3
};
a.Code = $"{a.A1.PadLeft(4, '0')}{a.A2.ToString()!.PadLeft(5, '0')}";
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_A错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}");
}
return a;
}
/// <summary>
/// 站地址和组地址标志A3
/// </summary>
/// <param name="hexAList">地址域A集合</param>
/// <returns></returns>
public virtual A3 Analysis_A3(List<string> hexAList)
{
A3 a3 = new A3();
try
{
if (hexAList.Count != 0)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
{
HexMessageList = hexAList.GetRange(4, 1) // 站地址和组地址标志A3 1个字节
};
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
var binStr = baseHexMessage.HexMessageString.HexTo4BinZero();
a3 = new A3
{
BaseHexMessage = baseHexMessage,
D0 = binStr.Substring(binStr.Length - 1, 1).BinToDec(),
D1_D7 = binStr.Substring(0, binStr.Length - 1).BinToDec()
};
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_A3错误,报文:{string.Join("", hexAList)},异常:{ex.Message}");
}
return a3;
}
/// <summary>
/// AFN_FC功能码
/// </summary>
/// <returns></returns>
public virtual AFN_FC Analysis_AFN_FC(List<string> hexStringList)
{
AFN_FC aFN_FC = new AFN_FC();
try
{
if (hexStringList.Count == 0)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
{
HexMessageList = hexStringList.GetRange(12, 1) //AFN功能码 1个字节
};
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
aFN_FC = new AFN_FC
{
BaseHexMessage = baseHexMessage,
AFN = baseHexMessage.HexMessageString.HexToDec(),
};
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_AFN_FC错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}");
}
return aFN_FC;
}
/// <summary>
/// 解析帧序列域SEQ
/// </summary>
/// <returns></returns>
public virtual SEQ Analysis_SEQ(List<string> hexStringList)
{
SEQ seq = new SEQ();
try
{
if (hexStringList.Count != 0)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
{
HexMessageList = hexStringList.GetRange(13, 1) //帧序列域 SEQ 1个字节
};
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
var binStr = baseHexMessage.HexMessageString.HexTo4BinZero();
seq = new SEQ
{
PSEQ = binStr.Substring(binStr.Length - 4, 4).BinToDec(),
CON = binStr.Substring(3, 1).BinToDec(),
FIN = binStr.Substring(2, 1).BinToDec(),
FIR = binStr.Substring(1, 1).BinToDec(),
TpV = binStr.Substring(0, 1).BinToDec()
};
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_SEQ错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}");
}
return seq;
}
/// <summary>
/// 数据单元标识及数据单元数据
/// </summary>
public virtual UnitData Analysis_UnitData(List<string> hexStringList)
{
UnitData unitData = new UnitData();
try
{
if (hexStringList.Count != 0)
{
unitData = new UnitData
{
HexMessageList = hexStringList.GetRange(14, hexStringList.Count - 14 - 2) //总数字节数-固定长度报文头-控制域C-地址域A-校验和CS-结束字符16H
};
unitData.HexMessageString = string.Join("", unitData.HexMessageList);
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_UnitData错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}");
}
return unitData;
}
/// <summary>
/// 信息点DA Pn
/// </summary>
/// <returns></returns>
public virtual DA Analysis_DA(List<string> hexStringList)
{
DA da = new DA();
try
{
if (hexStringList.Count != 0)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
{
HexMessageList = hexStringList.GetRange(14, 2) //信息点DA Pn 2个字节
};
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
var da1 = baseHexMessage.HexMessageList[0];
var da2 = baseHexMessage.HexMessageList[1];
da = new DA()
{
BaseHexMessage = baseHexMessage,
Pn = CalculatePn(da1, da2)
};
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_DA错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}");
}
return da;
}
/// <summary>
/// 信息类DT Fn
/// </summary>
/// <returns></returns>
public virtual DT Analysis_DT(List<string> hexStringList)
{
DT dt = new DT();
try
{
if (hexStringList.Count != 0)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
{
HexMessageList = hexStringList.GetRange(16, 2) //信息类DT Fn 2个字节
};
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
var dt1 = baseHexMessage.HexMessageList[0];
var dt2 = baseHexMessage.HexMessageList[1];
dt = new DT()
{
BaseHexMessage = baseHexMessage,
Fn = CalculateFn(dt1, dt2)
};
}
}
catch (Exception ex)
{
_logger.LogError($"解析Analysis_DT错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}");
}
return dt;
}
/// <summary>
/// 计算Pn
/// </summary>
/// <param name="da1"></param>
/// <param name="da2"></param>
/// <returns></returns>
public static int CalculatePn(string da1, string da2) => (da2.HexToDec() - 1) * 8 + (8 - da1.HexTo4BinZero().IndexOf(da1.Equals("00") ? "0" : "1"));
/// <summary>
/// 计算Fn
/// </summary>
/// <param name="dt1"></param>
/// <param name="dt2"></param>
/// <returns></returns>
public static int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexTo4BinZero().IndexOf("1"));
#region
/// <summary>
/// 组装报文
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity">设备数据实体</param>
/// <param name="afnFnCode">映射读取执行方法的Code例如10_1表示 10H_F1_00000,10H_F1_00001统一英文下划线分隔</param>
/// <returns></returns>
public abstract Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request);
#endregion
}
}

View File

@ -0,0 +1,27 @@
using JiShe.CollectBus.Protocol.Contracts.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Core;
namespace JiShe.CollectBus.Protocol.Contracts.Adapters
{
public class StandardFixedHeaderDataHandlingAdapter : CustomFixedHeaderDataHandlingAdapter<CustomFixedHeaderRequestInfo>
{
/// <summary>
/// 接口实现,指示固定包头长度
/// </summary>
public override int HeaderLength => 3;
/// <summary>
/// 获取新实例
/// </summary>
/// <returns></returns>
protected override CustomFixedHeaderRequestInfo GetInstance()
{
return new CustomFixedHeaderRequestInfo();
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Extensions;
namespace JiShe.CollectBus.Protocol.Contracts.AnalysisData
{
/// <summary>
/// 附录
/// </summary>
public static class Appendix
{
/// <summary>
/// 附录1 A1格式
/// </summary>
/// <returns></returns>
public static string Appendix_A1(List<string> data)
{
var seconds = data[0];
var minutes = data[1];
var hours = data[2];
var day = data[3];
var binString = data[4].HexToBin();
var months = binString.Substring(3, 1).BinToDec() * 10 + Convert.ToInt32(binString.Substring(4, 4).BinToHex());
var week = binString.Substring(0, 3).BinToHex();
var year = $"{DateTime.Now.ToString("yyyy").Substring(0, 2)}{data[5]}";
return $"{year}-{months.ToString().PadLeft(2, '0')}-{day} {hours}:{minutes}:{seconds}_{week}";
}
}
}

View File

@ -0,0 +1,33 @@
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts
{
public class AnalysisStrategyContext(IServiceProvider provider)
{
private readonly IServiceProvider _provider = provider;
/// <summary>
/// 执行策略
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="type"></param>
/// <param name="input"></param>
/// <returns></returns>
public Task<TResult> ExecuteAsync<TInput, TResult>(string type, TInput input)
{
var factory = _provider.GetRequiredService<Func<string, Type, Type, object>>();
var strategy = (IAnalysisStrategy<TInput, TResult>)factory(type, typeof(TInput), typeof(TResult));
return strategy.ExecuteAsync(input);
}
}
}

View File

@ -0,0 +1,10 @@
using System;
namespace JiShe.CollectBus.Protocol.Contracts.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
public class ProtocolNameAttribute(string name) : Attribute
{
public string Name { get; set; } = name;
}
}

View File

@ -1,9 +1,4 @@
using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Reflection;
using Volo.Abp;
using Volo.Abp.Modularity;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Protocol.Contracts
{
@ -11,91 +6,6 @@ namespace JiShe.CollectBus.Protocol.Contracts
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin));
//RegisterProtocolAnalysis(context.Services);
LoadAnalysisStrategy(context.Services);
}
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
Console.WriteLine("StandardProtocolPlugin OnApplicationInitializationAsync");
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
await standardProtocol.LoadAsync();
}
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
Console.WriteLine("StandardProtocolPlugin OnApplicationShutdown");
base.OnApplicationShutdown(context);
}
public void LoadAnalysisStrategy(IServiceCollection services)
{
var assembly = Assembly.GetExecutingAssembly();
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
foreach (var analysisStrategyType in analysisStrategyTypes)
{
var service = analysisStrategyType.GetInterfaces().First();
services.AddKeyedSingleton(service, analysisStrategyType.Name, analysisStrategyType);
}
}
public void RegisterProtocolAnalysis(IServiceCollection services)
{
// 扫描并注册所有策略
var strategyMetadata = new Dictionary<(string, Type, Type), Type>();
services.AddTransient<AnalysisStrategyContext>();
// 批量注册
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (string.IsNullOrWhiteSpace(assemblyPath))
{
return;
}
var dllFiles = Directory.GetFiles(Path.Combine(assemblyPath, "Plugins"), "*.dll");
foreach (var file in dllFiles)
{
// 跳过已加载的程序集
var assemblyName = AssemblyName.GetAssemblyName(file);
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IAnalysisStrategy接口
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
if (!analysisStrategyTypes.Any())
continue;
foreach (var analysisStrategyType in analysisStrategyTypes)
{
// 通过反射获取静态元数据
var strategyType = analysisStrategyType.Name;
var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments();
var inputType = genericArgs[0];
var resultType = genericArgs[1];
// 注册策略实现
services.AddTransient(analysisStrategyType);
strategyMetadata[(strategyType, inputType, resultType)] = analysisStrategyType;
}
}
// 注册元数据字典
services.AddSingleton(strategyMetadata);
// 注册策略解析工厂
services.AddTransient<Func<string, Type, Type, object?>>(provider => (name, inputType, resultType) =>
{
var metadata = provider.GetRequiredService<Dictionary<(string, Type, Type), Type>>();
if (metadata.TryGetValue((name, inputType, resultType), out var strategyType))
{
return provider.GetRequiredService(strategyType);
}
else
{
var logger = provider.GetRequiredService<ILogger<AnalysisStrategyContext>>();
logger.LogWarning($"未能找到解析策略:{name}-{inputType}-{resultType}");
return null;
}
});
}
}

View File

@ -0,0 +1,15 @@
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Dto;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{
public interface IAnalysisStrategy<TInput, TResult>
{
Task<TResult> ExecuteAsync(TInput input);
}
}

View File

@ -12,10 +12,8 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj" />
</ItemGroup>
<Target Name="PostBuild" AfterTargets="PostBuildEvent">

View File

@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Core;
namespace JiShe.CollectBus.Protocol.Contracts.Models
{
public class CustomFixedHeaderRequestInfo : IFixedHeaderRequestInfo
{
/// <summary>
/// 报文开始前缀
/// </summary>
public string StartPrefix { get; set; } = "68";
/// <summary>
/// 报文结束后缀
/// </summary>
public string EndPrefix { get; set; } = "16";
/// <summary>
/// 头部长度
/// </summary>
public int HeadLength = 5;
/// <summary>
/// 固定长度
/// </summary>
private const int FixedLength = 17;
/// <summary>
/// 报文长度
/// </summary>
public int PacketLength { get; set; }
/// <summary>
/// 控制域 C
/// </summary>
public int ControlDomain { get; set; }
public bool OnParsingHeader(ReadOnlySpan<byte> header)
{
//throw new NotImplementedException();
return true;
}
public bool OnParsingBody(ReadOnlySpan<byte> body)
{
//throw new NotImplementedException();
return true;
}
public int BodyLength { get; }
}
}

View File

@ -4,7 +4,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol3761
namespace JiShe.CollectBus.Protocol.Contracts.Models
{
/// <summary>
@ -238,7 +238,10 @@ namespace JiShe.CollectBus.Protocol3761
/// <summary>
/// 数据单元标识和数据单元格式
/// </summary>
public class UnitData: BaseHexMessage{ }
public class UnitData: BaseHexMessage
{
}
#endregion

View File

@ -0,0 +1,95 @@
using JiShe.CollectBus.Protocol.Dto;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN10_F10_AnalysisDto
{
/// <summary>
/// 本次电能表/交流采样装置配置数量 n
/// </summary>
public int ConfigNum { get; set; }
/// <summary>
/// 电能表/交流采样装置配置信息
/// </summary>
public List<AFN10F10Entity> AFN10F10Entitys { get; set; } = new List<AFN10F10Entity>();
}
public class AFN10F10Entity
{
/// <summary>
/// 电能表/交流采样装置序号
/// </summary>
public int SerialNum { get; set; }
/// <summary>
/// 所属测量点号
/// </summary>
public int Point { get; set; }
/// <summary>
/// 通信速率
/// </summary>
public int BaudRate { get; set; }
/// <summary>
/// 端口号
/// </summary>
public int Port { get; set; }
/// <summary>
/// 通信协议类型
/// </summary>
public string RuleType { get; set; }
/// <summary>
/// 通信地址
/// </summary>
public string ComAddress { get; set; }
/// <summary>
/// 通信密码
/// </summary>
public string ComPwd { get; set; }
/// <summary>
/// 电能费率个数
/// </summary>
public int ElectricityRatesNum { get; set; }
/// <summary>
/// 有功电能示值整数位及小数位个数
/// </summary>
public int IntegerBitsNum { get; set; }
/// <summary>
/// 有功电能示值小数位个数
/// </summary>
public int DecimalPlacesNum { get; set; }
/// <summary>
/// 所属采集器通信地址
/// </summary>
public string CollectorAddress{ get; set; }
/// <summary>
/// 用户大类号
/// </summary>
public int UserCategoryNum { get; set;}
/// <summary>
/// 用户小类号
/// </summary>
public int UserSubclassNum { get; set; }
}
}

View File

@ -0,0 +1,45 @@
using JiShe.CollectBus.Protocol.Dto;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN10_F66_AnalysisDto
{
/// <summary>
/// 定时发送周期
/// 1:用 D6D7 编码表示,取值 03 依次表示分、时、日、月
/// 2:用 D0D5 表示,为定时上报数据的时间周期
/// </summary>
public int Cycle { get; set; }
/// <summary>
/// 定时发送周期(单位)
/// </summary>
public int Unit { get; set; }
/// <summary>
/// 发送基准时间
/// </summary>
public DateTime BaseTime { get; set; }
/// <summary>
/// 曲线数据提取倍率
/// </summary>
public int CurveRatio { get; set; }
/// <summary>
/// 任务号
/// </summary>
public int Pn { get; set; }
public List<SetAutoItemCodeDetails> Details { get; set; } = new List<SetAutoItemCodeDetails>();
}
public class SetAutoItemCodeDetails
{
public int Pn { get; set; }
public int Fn { get; set; }
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F129_AnalysisDto : AnalysisBaseDto<decimal>
{
public DateTime ReadTime { get; set; }
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F130_AnalysisDto : AnalysisBaseDto<decimal>
{
public DateTime ReadTime { get; set; }
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F131_AnalysisDto: AnalysisBaseDto<decimal>
{
public DateTime ReadTime { get; set; }
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F132_AnalysisDto: AnalysisBaseDto<decimal>
{
public DateTime ReadTime { get; set; }
}
}

View File

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F145_AnalysisDto : AnalysisBaseDto<decimal>
{
/// <summary>
/// 最大需量时标
/// </summary>
public string TimeSpan { get; set; }
public DateTime ReadingDate { get; set; }
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F149_AnalysisDto : AnalysisBaseDto<decimal>
{
/// <summary>
/// 最大需量时标
/// </summary>
public string TimeSpan { get; set; }
public DateTime ReadingDate { get; set; }
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F188_AnalysisDto : AnalysisBaseDto<decimal>
{
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F25_AnalysisDto: AnalysisBaseDto<decimal>
{
/// <summary>
/// 读取时间
/// </summary>
public DateTime ReadTime { get; set; }
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F2_AnalysisDto: AnalysisBaseDto<string>
{
}
}

View File

@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F33_AnalysisDto
{
/// <summary>
/// 终端抄表时间
/// </summary>
public string ReadTime { get; set; }
/// <summary>
/// 费率数M1≤M≤12
/// </summary>
public int RatingCount { get; set; }
/// <summary>
/// 当前正向有功总电能示值
/// </summary>
public ParentNodes F_A_Kwh { get; set; }
/// <summary>
/// 当前正向无功组合无功1总电能示值
/// </summary>
public ParentNodes R_R_Kvarh { get; set; }
/// <summary>
/// 当前一象限无功总电能示值
/// </summary>
public ParentNodes Q1_R_Kvarh { get; set; }
/// <summary>
/// 当前四象限无功总电能示值
/// </summary>
public ParentNodes Q4_R_Kvarh { get; set; }
}
public class ParentNodes
{
/// <summary>
/// 总电能示值
/// </summary>
public decimal Total_Value { get; set; }
public List<ChildNodes> childNodes { get; set; }
}
public class ChildNodes
{
/// <summary>
/// 费率总电能示值
/// </summary>
public decimal Value { get; set; }
}
}

View File

@ -0,0 +1,15 @@
using JiShe.CollectBus.Protocol.Dto;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN12_F49_AnalysisDto: AnalysisBaseDto<decimal>
{
}
}

View File

@ -0,0 +1,38 @@
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AFN9_F1_AnalysisDto
{
public int FocusID { get; set; }
public string? AreaCode { get; set; }
public string? Address { get; set; }
/// <summary>
/// 厂商代号
/// </summary>
public string? MakerNo { get; set; }
/// <summary>
/// 设备编号
/// </summary>
public string? DeviceNo { get; set; }
/// <summary>
/// 终端软件版本号
/// </summary>
public string? SoftwareVersion { get; set; }
/// <summary>
/// 终端软件发布日期:日月年
/// </summary>
public string? SoftwareReleaseDate { get; set; }
/// <summary>
/// 硬件软件版本号
/// </summary>
public string? HardwareVersion { get; set; }
/// <summary>
/// 硬件软件发布日期:日月年
/// </summary>
public string? HardwareReleaseDate { get; set; }
public string? AddDate { get; set; }
}
}

View File

@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Protocol.Dto
{
public class AnalysisBaseDto
{
/// <summary>
/// 是否有效数据
/// </summary>
public bool ValidData { get; set; } = true;
/// <summary>
/// 数据类型
/// </summary>
public string DataType { get; set; }
/// <summary>
/// 错误码信息
/// </summary>
public string ErrorCodeMsg { get; set; }
/// <summary>
/// 字段名
/// </summary>
public string FiledName { get; set; }
/// <summary>
/// 字段描述
/// </summary>
public string FiledDesc { get; set; }
}
public class AnalysisBaseDto<T> : AnalysisBaseDto
{
/// <summary>
/// 抄读值
/// </summary>
public T? DataValue { get; set; }
}
}

View File

@ -0,0 +1,36 @@
namespace JiShe.CollectBus.Protocol.Dto
{
public class UnitDataAnalysis
{
/// <summary>
/// 集中器地址
/// </summary>
public string? Code { get; set; }
/// <summary>
/// AFN功能码
/// </summary>
public int AFN { get; set; }
/// <summary>
/// 信息点
/// </summary>
public int Pn { get; set; }
/// <summary>
/// 信息类
/// </summary>
public int Fn { get; set; }
}
public class UnitDataAnalysis<T>: UnitDataAnalysis
{
/// <summary>
/// 数据
/// </summary>
public T? Data { get; set; }
}
}

View File

@ -0,0 +1,70 @@
using JiShe.CollectBus.Protocol.Contracts.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using JiShe.CollectBus.Common.Extensions;
namespace JiShe.CollectBus.Protocol.Contracts
{
public class QGDW3761Config
{
private static List<TB3761> _commandList = null;
public static List<TB3761> CommandList
{
get
{
if (_commandList == null)
{
var filePath = AppDomain.CurrentDomain.BaseDirectory + "cmd3761Matching.txt";
try
{
var fileStr = "";
if (File.Exists(filePath))
fileStr = File.ReadAllText(filePath, Encoding.UTF8);
if (!string.IsNullOrWhiteSpace(fileStr))
{
_commandList = fileStr.FromJson<List<TB3761>>();
}
}
catch (Exception)
{
}
}
return _commandList;
}
}
private static List<TB3761> _commandTdcList = null;
public static List<TB3761> CommandTdcList
{
get
{
if (_commandTdcList == null)
{
var filePath = AppDomain.CurrentDomain.BaseDirectory + "cmd3761TdcMatching.txt";
try
{
var fileStr = "";
if (File.Exists(filePath))
fileStr = File.ReadAllText(filePath, Encoding.UTF8);
if (!string.IsNullOrWhiteSpace(fileStr))
{
_commandTdcList = fileStr.FromJson<List<TB3761>>();
}
}
catch (Exception)
{
}
}
return _commandTdcList;
}
}
}
}

View File

@ -1,508 +0,0 @@
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using JiShe.CollectBus.Protocol3761;
using Mapster;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Protocol
{
public class StandardProtocolPlugin : ProtocolPlugin
{
private readonly ILogger<StandardProtocolPlugin> _logger;
private readonly IProducerService _producerService;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly ITcpService _tcpService;
public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers;
/// <summary>
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class.
/// </summary>
/// <param name="serviceProvider">The service provider.</param>
public StandardProtocolPlugin(IServiceProvider serviceProvider, ILogger<StandardProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger)
{
_logger = logger;
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
_producerService = serviceProvider.GetRequiredService<IProducerService>();
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
_tcpService = tcpService;
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
}
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{
TB3761? tB3761 = Analysis3761(messageReceived);
if (tB3761 != null)
{
if (tB3761.AFN_FC?.AFN == (int)AFN.)
{
if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
{
_logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
}
else
{
if (tB3761.DT?.Fn == (int)FN.)
{
// 登录回复
if (tB3761.SEQ.CON == (int)CON.)
await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
else if (tB3761.DT?.Fn == (int)FN.)
{
// 心跳回复
//心跳帧有两种情况:
//1. 集中器先有登录帧,再有心跳帧
//2. 集中器没有登录帧,只有心跳帧
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
}
}
await OnTcpNormalReceived(client, tB3761);
}
return (tB3761 as T)!;
}
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param>
/// <param name="tB3761"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, TB3761 tB3761)
{
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
if (tB3761.AFN_FC.BaseHexMessage == null || tB3761.DT.BaseHexMessage == null || tB3761.BaseHexMessage.HexMessageString==null)
{
_logger.LogError("376.1协议解析AFN失败");
return;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
if (tB3761.DT.Fn == (int)FN. || tB3761.DT.Fn == (int)FN.)
return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761.AFN_FC.AFN.ToString().PadLeft(2, '0'));
MessageProtocolAnalysis<TB3761> messageReceivedAnalysis = new MessageProtocolAnalysis<TB3761>()
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = tB3761.BaseHexMessage.HexMessageString!,
DeviceNo = tB3761.A.Code!,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
Data = tB3761
};
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if (topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, messageReceivedAnalysis);
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
}
}
/// <summary>
/// 登录回复
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task LoginAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(code);
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedLoginEvent = new MessageReceivedLogin
{
ClientId = code,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime=DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
var reqParam = new ReqParameter2
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
var issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedLoginEvent.ClientId,
DeviceNo = messageReceivedLoginEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Login,
MessageId = messageReceivedLoginEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
}
}
/// <summary>
/// 心跳帧解析
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task HeartbeatAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string clientId = code;
string oldClientId = $"{client.Id}";
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
if (clientId != oldClientId)
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
}
else
{
entity.UpdateByLoginAndHeartbeat();
}
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
var reqParam = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value,
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
IssuedEventMessage issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedHeartbeatEvent.ClientId,
DeviceNo = messageReceivedHeartbeatEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Heartbeat,
MessageId = messageReceivedHeartbeatEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
}
}
///// <summary>
///// 组装报文
///// </summary>
///// <param name="request">报文构建参数</param>
///// <returns></returns>
//public override async Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request)
//{
// if (request == null)
// {
// throw new Exception($"{nameof(StandardProtocolPlugin)} 报文构建失败,参数为空");
// }
// var itemCodeArr = request.ItemCode.Split('_');
// var aFNStr = itemCodeArr[0];
// var aFN = (AFN)aFNStr.HexToDec();
// var fn = int.Parse(itemCodeArr[1]);
// Telemetry3761PacketResponse builderResponse = null;
// List<string> dataUnit = new List<string>();
// //数据转发场景 10H_F1_1CH
// if (aFNStr == "10" && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
// {
// var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send";
// Telemetry645PacketResponse t645PacketResponse = null;
// if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName
// , out var t645PacketHandler))
// {
// t645PacketResponse = t645PacketHandler(new Telemetry645PacketRequest()
// {
// MeterAddress = request.SubProtocolRequest.MeterAddress,
// Password = request.SubProtocolRequest.Password,
// ItemCode = request.SubProtocolRequest.ItemCode,
// });
// }
// if (t645PacketResponse != null)
// {
// dataUnit = t645PacketResponse.Data;
// }
// }
// string afnMethonCode = $"AFN{aFNStr}_Fn_Send";
// if (T3761AFNHandlers != null && T3761AFNHandlers.TryGetValue(afnMethonCode
// , out var handler))
// {
// builderResponse = handler(new Telemetry3761PacketRequest()
// {
// FocusAddress = request.FocusAddress,
// Fn = fn,
// Pn = request.Pn,
// DataUnit = dataUnit,
// });
// }
// if (builderResponse == null)
// {
// return new ProtocolBuildResponse();
// }
// var result = builderResponse.Adapt<ProtocolBuildResponse>();
// result.IsSuccess = true;
// return await Task.FromResult(result);
//}
#region
//68
//32 00
//32 00
//68
//C9 1100'1001. 控制域C。
// D7=1, (终端发送)上行方向。
// D6=1, 此帧来自启动站。
// D5=0, (上行方向)要求访问位。表示终端无事件数据等待访问。
// D4=0, 保留
// D3~D0=9, 功能码。链路测试
//20 32 行政区划码
//90 26 终端地址
//00 主站地址和组地址标志。终端为单地址。 //3220 09 87 2
// 终端启动的发送帧的 MSA 应为 0, 其主站响应帧的 MSA 也应为 0.
//02 应用层功能码。AFN=2, 链路接口检测
//70 0111'0000. 帧序列域。无时间标签、单帧、需要确认。
//00 00 信息点。DA1和DA2全为“0”时表示终端信息点。
//01 00 信息类。F1, 登录。
//44 帧尾,包含用户区数据校验和
//16 帧结束标志
/// <summary>
/// 解析上行命令
/// </summary>
/// <param name="cmd"></param>
/// <returns></returns>
public CommandReulst? AnalysisCmd(string cmd)
{
CommandReulst? commandReulst = null;
var hexStringList = cmd.StringToPairs();
if (hexStringList.Count < hearderLen)
{
return commandReulst;
}
//验证起始字符
if (!hexStringList[0].IsStartStr() || !hexStringList[5].IsStartStr())
{
return commandReulst;
}
var lenHexStr = $"{hexStringList[2]}{hexStringList[1]}";
var lenBin = lenHexStr.HexToBin();
var len = lenBin.Remove(lenBin.Length - 2).BinToDec();
//验证长度
if (hexStringList.Count - 2 != hearderLen + len)
return commandReulst;
var userDataIndex = hearderLen;
var c = hexStringList[userDataIndex];//控制域 1字节
userDataIndex += 1;
var aHexList = hexStringList.Skip(userDataIndex).Take(5).ToList();//地址域 5字节
var a = AnalysisA(aHexList);
var a3Bin = aHexList[4].HexToBin().PadLeft(8, '0');
var mSA = a3Bin.Substring(0, 7).BinToDec();
userDataIndex += 5;
var aFN = (AFN)hexStringList[userDataIndex].HexToDec();//1字节
userDataIndex += 1;
var seq = hexStringList[userDataIndex].HexToBin().PadLeft(8, '0');
var tpV = (TpV)Convert.ToInt32(seq.Substring(0, 1));
var fIRFIN = (FIRFIN)Convert.ToInt32(seq.Substring(1, 2));
var cON = (CON)Convert.ToInt32(seq.Substring(3, 1));
var prseqBin = seq.Substring(4, 4);
userDataIndex += 1;
// (DA2 - 1) * 8 + DA1 = pn
var da1Bin = hexStringList[userDataIndex].HexToBin();
var da1 = da1Bin == "0" ? 0 : da1Bin.Length;
userDataIndex += 1;
var da2 = hexStringList[userDataIndex].HexToDec();
var pn = da2 == 0 ? 0 : (da2 - 1) * 8 + da1;
userDataIndex += 1;
//(DT2*8)+DT1=fn
var dt1Bin = hexStringList[userDataIndex].HexToBin();
var dt1 = dt1Bin != "0" ? dt1Bin.Length : 0;
userDataIndex += 1;
var dt2 = hexStringList[userDataIndex].HexToDec();
var fn = dt2 * 8 + dt1;
userDataIndex += 1;
//数据单元
var datas = hexStringList.Skip(userDataIndex).Take(len + hearderLen - userDataIndex).ToList();
//EC
//Tp
commandReulst = new CommandReulst()
{
A = a,
MSA = mSA,
AFN = aFN,
Seq = new Seq()
{
TpV = tpV,
FIRFIN = fIRFIN,
CON = cON,
PRSEQ = prseqBin.BinToDec(),
},
CmdLength = len,
Pn = pn,
Fn = fn,
HexDatas = datas
};
return commandReulst;
}
/// <summary>
/// 解析地址
/// </summary>
/// <param name="aHexList"></param>
/// <returns></returns>
private string AnalysisA(List<string> aHexList)
{
var a1 = aHexList[1] + aHexList[0];
var a2 = aHexList[3] + aHexList[2];
var a2Dec = a2.HexToDec();
var a3 = aHexList[4];
var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}";
return a;
}
public override Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request)
{
throw new NotImplementedException();
}
#endregion
}
}

View File

@ -1,6 +1,6 @@
using JiShe.CollectBus.Protocol.Dto;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Dto;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H

View File

@ -1,14 +1,4 @@
using FreeSql.Internal.CommonProvider;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Protocol.AnalysisData.Appendix;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
@ -16,7 +6,6 @@ using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto;
using Microsoft.Extensions.Logging;
using NUglify.JavaScript.Syntax;
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_0CH
{
@ -27,13 +16,11 @@ namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_0CH
{
private readonly ILogger<AFN12_F129_Analysis> _logger;
private readonly AnalysisStrategyContext _analysisStrategyContext;
private readonly IIoTDbProvider _dbProvider;
public AFN12_F129_Analysis(ILogger<AFN12_F129_Analysis> logger, AnalysisStrategyContext analysisStrategyContext, IIoTDbProvider dbProvider)
public AFN12_F129_Analysis(ILogger<AFN12_F129_Analysis> logger, AnalysisStrategyContext analysisStrategyContext)
{
_logger = logger;
_analysisStrategyContext = analysisStrategyContext;
_dbProvider= dbProvider;
}
public async Task<UnitDataAnalysis<List<AFN12_F129_AnalysisDto>>> ExecuteAsync(TB3761 input)
@ -42,34 +29,16 @@ namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_0CH
{
ArgumentNullException.ThrowIfNull(input);
ArgumentNullException.ThrowIfNull(input.UnitData?.HexMessageList);
ArgumentNullException.ThrowIfNull(input.A.A3?.D1_D7);
List<string> datas = await AnalysisDataUnitAsync(input.UnitData.HexMessageList);
List<AFN12_F129_AnalysisDto> list = GenerateFinalResult(2, datas, "正向有功电能示值", input.AFN_FC.AFN, input.DT.Fn);
UnitDataAnalysis<List<AFN12_F129_AnalysisDto>> unitDataAnalysis = new UnitDataAnalysis<List<AFN12_F129_AnalysisDto>>
{
Code = input.A.Code,
AFN = input.AFN_FC.AFN,
Fn = input.DT.Fn,
Pn = input.DA.Pn,
MSA= input.A.A3.D1_D7,
PSEQ= input.SEQ.PSEQ,
Data = list
};
string taskMark = CommonHelper.GetTaskMark(unitDataAnalysis.AFN, unitDataAnalysis.Fn, unitDataAnalysis.Pn, unitDataAnalysis.MSA, unitDataAnalysis.PSEQ);
string scoreValue = $"{unitDataAnalysis.Code}.{taskMark}".Md5Fun();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "ScoreValue",
Operator = "=",
IsNumber = false,
Value = scoreValue
});
var taskSendInfo = await _dbProvider.QueryAsync<MeterReadingTelemetryPacketInfo>(new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(), Conditions = conditions,PageIndex=0,PageSize=1});
List<string> datas = await AnalysisDataUnitAsync(input.UnitData.HexMessageList);
List<AFN12_F129_AnalysisDto> list = GenerateFinalResult(2, datas, "正向有功电能示值", input.AFN_FC.AFN, input.DT.Fn);
unitDataAnalysis.Data= list;
return await Task.FromResult(unitDataAnalysis);
}
@ -129,13 +98,6 @@ namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_0CH
return list;
}
/// <summary>
/// 生成最终结果
/// </summary>
/// <returns></returns>
public Task<bool> SaveIotDbAsync()
{
}
}
}

View File

@ -1,5 +1,15 @@
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.AnalysisData;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Dto;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Serilog.Core;
using System;
using System.Reflection;
using Volo.Abp;
using Volo.Abp.Modularity;
@ -39,5 +49,63 @@ namespace JiShe.CollectBus.Protocol
}
}
public void RegisterProtocolAnalysis(IServiceCollection services)
{
// 扫描并注册所有策略
var strategyMetadata = new Dictionary<(string, Type, Type), Type>();
services.AddTransient<AnalysisStrategyContext>();
// 批量注册
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (string.IsNullOrWhiteSpace(assemblyPath))
{
return;
}
var dllFiles = Directory.GetFiles(Path.Combine(assemblyPath, "Plugins") , "*.dll");
foreach (var file in dllFiles)
{
// 跳过已加载的程序集
var assemblyName = AssemblyName.GetAssemblyName(file);
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IAnalysisStrategy接口
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
if (!analysisStrategyTypes.Any())
continue;
foreach (var analysisStrategyType in analysisStrategyTypes)
{
// 通过反射获取静态元数据
var strategyType = analysisStrategyType.Name;
var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments();
var inputType = genericArgs[0];
var resultType = genericArgs[1];
// 注册策略实现
services.AddTransient(analysisStrategyType);
strategyMetadata[(strategyType, inputType, resultType)] = analysisStrategyType;
}
}
// 注册元数据字典
services.AddSingleton(strategyMetadata);
// 注册策略解析工厂
services.AddTransient<Func<string, Type, Type, object?>>(provider => (name, inputType, resultType) =>
{
var metadata = provider.GetRequiredService<Dictionary<(string, Type, Type), Type>>();
if (metadata.TryGetValue((name, inputType, resultType), out var strategyType))
{
return provider.GetRequiredService(strategyType);
}
else
{
var logger= provider.GetRequiredService<ILogger<AnalysisStrategyContext>>();
logger.LogWarning($"未能找到解析策略:{name}-{inputType}-{resultType}");
return null;
}
});
}
}
}

View File

@ -52,41 +52,263 @@ namespace JiShe.CollectBus.Protocol
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{
//TODO:645解析报文
//TB3761? tB3761 = Analysis3761(messageReceived);
//if (tB3761 != null)
//{
// if (tB3761.AFN_FC?.AFN == (int)AFN.链路接口检测)
// {
// if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
// {
// _logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
// }
// else
// {
// if (tB3761.DT?.Fn == (int)FN.登录)
// {
// // 登录回复
// if (tB3761.SEQ.CON == (int)CON.需要对该帧进行确认)
// await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// else if (tB3761.DT?.Fn == (int)FN.心跳)
// {
// // 心跳回复
// //心跳帧有两种情况:
// //1. 集中器先有登录帧,再有心跳帧
// //2. 集中器没有登录帧,只有心跳帧
// await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// }
TB3761? tB3761 = Analysis3761(messageReceived);
if (tB3761 != null)
{
if (tB3761.AFN_FC?.AFN == (int)AFN.)
{
if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
{
_logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
}
else
{
if (tB3761.DT?.Fn == (int)FN.)
{
// 登录回复
if (tB3761.SEQ.CON == (int)CON.)
await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
else if (tB3761.DT?.Fn == (int)FN.)
{
// 心跳回复
//心跳帧有两种情况:
//1. 集中器先有登录帧,再有心跳帧
//2. 集中器没有登录帧,只有心跳帧
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
}
// }
// await OnTcpNormalReceived(client, tB3761);
//}
//return (tB3761 as T)!;
return null;
}
await OnTcpNormalReceived(client, tB3761);
}
return (tB3761 as T)!;
}
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param>
/// <param name="tB3761"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, TB3761 tB3761)
{
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
if (tB3761.AFN_FC.BaseHexMessage == null || tB3761.DT.BaseHexMessage == null || tB3761.BaseHexMessage.HexMessageString==null)
{
_logger.LogError("376.1协议解析AFN失败");
return;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
if (tB3761.DT.Fn == (int)FN. || tB3761.DT.Fn == (int)FN.)
return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761.AFN_FC.AFN.ToString().PadLeft(2, '0'));
MessageProtocolAnalysis<TB3761> messageReceivedAnalysis = new MessageProtocolAnalysis<TB3761>()
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = tB3761.BaseHexMessage.HexMessageString!,
DeviceNo = tB3761.A.Code!,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
Data = tB3761
};
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if (topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, messageReceivedAnalysis);
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
}
}
/// <summary>
/// 登录回复
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task LoginAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(code);
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedLoginEvent = new MessageReceivedLogin
{
ClientId = code,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime=DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
var reqParam = new ReqParameter2
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
var issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedLoginEvent.ClientId,
DeviceNo = messageReceivedLoginEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Login,
MessageId = messageReceivedLoginEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
}
}
/// <summary>
/// 心跳帧解析
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task HeartbeatAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string clientId = code;
string oldClientId = $"{client.Id}";
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
if (clientId != oldClientId)
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
}
else
{
entity.UpdateByLoginAndHeartbeat();
}
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
var reqParam = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value,
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
IssuedEventMessage issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedHeartbeatEvent.ClientId,
DeviceNo = messageReceivedHeartbeatEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Heartbeat,
MessageId = messageReceivedHeartbeatEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
}
}
/// <summary>
/// 组装报文
/// </summary>

View File

@ -4,7 +4,6 @@ using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;

View File

@ -1,5 +1,10 @@
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts
{

View File

@ -17,6 +17,8 @@ namespace JiShe.CollectBus.Protocol.Interfaces
Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null) where T : class;
TB3761? Analysis3761(string messageReceived);
/// <summary>
/// 组装报文
/// </summary>
@ -24,5 +26,9 @@ namespace JiShe.CollectBus.Protocol.Interfaces
/// <param name="afnFnCode">映射读取执行方法的Code例如10_1表示10H_F1</param>
/// <returns></returns>
Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request);
//Task LoginAsync(MessageReceivedLogin messageReceived);
//Task HeartbeatAsync(MessageReceivedHeartbeat messageReceived);
}
}

View File

@ -12,11 +12,6 @@
<None Remove="Extensions\**" />
</ItemGroup>
<ItemGroup>
<Compile Remove="Abstracts\BaseProtocolPlugin_bak.cs" />
<Compile Remove="QGDW3761Config.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="TouchSocket" Version="2.1.9" />

View File

@ -1,12 +1,18 @@
using JiShe.CollectBus.Common.Consts;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using System.Text.RegularExpressions;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Protocol.Contracts.Services
{

View File

@ -3,7 +3,6 @@ using System.Threading.Tasks;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol3761;
using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers
@ -12,7 +11,7 @@ namespace JiShe.CollectBus.Subscribers
{
Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
Task<ISubscribeAck> ReceivedEvent(MessageProtocolAnalysis<TB3761> receivedMessage);
Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage);
Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessage);
Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessage);
}

View File

@ -1,16 +1,35 @@
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Core;
using TouchSocket.Sockets;
using Volo.Abp.Caching;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories;
using static System.Formats.Asn1.AsnWriter;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.Plugins
{

View File

@ -10,11 +10,12 @@ using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
@ -24,6 +25,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using JiShe.CollectBus.IotSystems.Ammeters;
namespace JiShe.CollectBus.Samples;

View File

@ -1,5 +1,6 @@
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Encrypt;
@ -7,6 +8,7 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
@ -18,16 +20,19 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Identity.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using static System.Runtime.CompilerServices.RuntimeHelpers;
namespace JiShe.CollectBus.ScheduledMeterReading
{

View File

@ -5,14 +5,18 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{

View File

@ -1,5 +1,6 @@
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IoTDB.Interface;
@ -7,8 +8,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@ -103,7 +103,7 @@ namespace JiShe.CollectBus.Subscribers
}
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task<ISubscribeAck> ReceivedEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
{
var currentTime = Clock.Now;
@ -115,24 +115,24 @@ namespace JiShe.CollectBus.Subscribers
else
{
//todo 会根据不同的协议进行解析,然后做业务处理
//TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
//if (tB3761 == null)
//{
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
// return SubscribeAck.Success();
//}
//if (tB3761.DT == null || tB3761.AFN_FC == null)
//{
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
// return SubscribeAck.Success();
//}
TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
if (tB3761 == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
if (tB3761.DT == null || tB3761.AFN_FC == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
//报文入库
var entity = new MeterReadingRecords()
{
ReceivedMessageHexString = receivedMessage.MessageHexString,
AFN = (AFN)receivedMessage.Data?.AFN_FC.AFN!,
Fn = receivedMessage.Data.DT.Fn,
AFN = (AFN)tB3761.AFN_FC.AFN,
Fn = tB3761.DT.Fn,
Pn = 0,
FocusAddress = "",
MeterAddress = "",

View File

@ -1,119 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Consts
{
/// <summary>
/// 数据保存字段常量
/// </summary>
public class DataFieldConst
{
public const string ZYGDN = "ZYGDN";
public const string ZWGDN = "ZWGDN";
public const string FYGDN = "FYGDN";
public const string FWGDN = "FWGDN";
public const string ZYGDNSZ = "ZYGDNSZ";
public const string ZWGDNSZ = "ZWGDNSZ";
public const string FYGDNSZ = "FYGDNSZ";
public const string FWGDNSZ = "FWGDNSZ";
public const string WGDN1 = "WGDN1";
public const string WGDN2 = "WGDN2";
public const string WGDN3 = "WGDN3";
public const string WGDN4 = "WGDN4";
public const string WGDNSZ1 = "WGDNSZ1";
public const string WGDNSZ2 = "WGDNSZ2";
public const string WGDNSZ3 = "WGDNSZ3";
public const string WGDNSZ4 = "WGDNSZ4";
public const string YGGL = "YGGL";
public const string AYGGL = "AYGGL";
public const string BYGGL = "BYGGL";
public const string CYGGL = "CYGGL";
public const string WGGL = "WGGL";
public const string AWGGL = "AWGGL";
public const string BWGGL = "BWGGL";
public const string CWGGL = "CWGGL";
public const string GLYS = "GLYS";
public const string AGLYS = "AGLYS";
public const string BGLYS = "BGLYS";
public const string CGLYS = "CGLYS";
public const string ADY = "ADY";
public const string BDY = "BDY";
public const string CDY = "CDY";
public const string ADL = "ADL";
public const string BDL = "BDL";
public const string CDL = "CDL";
public const string PowerGridFrequency = "PowerGridFrequency";
public const string Ua = "Ua";
public const string Ub = "Ub";
public const string Uc = "Uc";
public const string Ia = "Ia";
public const string Ib = "Ib";
public const string Ic = "Ic";
}
public class ConstGatherDataType
{
public const string ZYGDN = "0D_97"; //正向有功总电能量
public const string ZWGDN = "0D_98"; //正向无功总电能量曲线
public const string FYGDN = "0D_99"; //反向有功总电能量曲线
public const string FWGDN = "0D_100"; //反向无功总电能量曲线
public const string ZYGDNSZ = "0D_101"; //正向有功总电能示值曲线
public const string ZWGDNSZ = "0D_102"; //正向无功总电能示值曲线
public const string FYGDNSZ = "0D_103"; //反向有功总电能示值曲线
public const string FWGDNSZ = "0D_104"; //反向无功总电能示值曲线
public const string WGDNSZ1 = "0D_145"; //一象限无功总电能示值曲线
public const string WGDNSZ4 = "0D_146"; //四象限无功总电能示值曲线
public const string WGDNSZ2 = "0D_147"; //二象限无功总电能示值曲线
public const string WGDNSZ3 = "0D_148"; //三象限无功总电能示值曲线
public const string YGGL = "0D_81"; //有功功率曲线
public const string AYGGL = "0D_82"; //A相有功功率曲线
public const string BYGGL = "0D_83"; //B相有功功率曲线
public const string CYGGL = "0D_84"; //C相有功功率曲线
public const string WGGL = "0D_85"; //无功功率曲线
public const string AWGGL = "0D_86"; //A相无功功率曲线
public const string BWGGL = "0D_87"; //B相无功功率曲线
public const string CWGGL = "0D_88"; //C相无功功率曲线
public const string GLYS = "0D_105"; // 功率因数曲线
public const string AGLYS = "0D_106"; // A相功率因数曲线
public const string BGLYS = "0D_107"; // B相功率因数曲线
public const string CGLYS = "0D_108"; // C相功率因数曲线
public const string ADY = "0D_89"; //A相电压曲线
public const string BDY = "0D_90"; //B相电压曲线
public const string CDY = "0D_91"; //C相电压曲线
public const string ADL = "0D_92"; //A相电流曲线
public const string BDL = "0D_93"; //B相电流曲线
public const string CDL = "0D_94"; //C相电流曲线
public const string PowerGridFrequency = "10_97"; //电网频率
public const string Ua = "0C_49_Uab_Ua"; // 当前电压、电流相位角
public const string Ub = "0C_49_Ub"; // 当前电压、电流相位角
public const string Uc = "0C_49_Ucb_Uc"; // 当前电压、电流相位角
public const string Ia = "0C_49_Ia"; // 当前电压、电流相位角
public const string Ib = "0C_49_Ib"; // 当前电压、电流相位角
public const string Ic = "0C_49_Ic"; // 当前电压、电流相位角
}
}

View File

@ -1,26 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Helpers
{
public class DataFieldHelper
{
public static string GetDataField(string dataField)
{
if (string.IsNullOrWhiteSpace(dataField))
{
return string.Empty;
}
if (dataField.Contains("."))
{
return dataField.Split('.')[0];
}
return dataField;
}
}
}