暂存代码

This commit is contained in:
ChenYi 2025-03-27 08:38:19 +08:00
parent af9b0d8a77
commit a9b8323a3a
13 changed files with 138 additions and 93 deletions

View File

@ -49,7 +49,7 @@ namespace JiShe.CollectBus.Consumers
var list = new List<MessageReceived>();
foreach (var contextItem in context.Message)
{
await protocolPlugin.AnalyzeAsync<TB3761FN>(contextItem.Message);
await protocolPlugin.AnalyzeAsync<TB3761>(contextItem.Message);
list.Add(contextItem.Message);
}
await _messageReceivedEventRepository.InsertManyAsync(list);

View File

@ -15,14 +15,14 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="MassTransit.Kafka" Version="8.3.2" />
<PackageReference Include="MassTransit.Kafka" Version="8.4.0" />
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AutoMapper" Version="8.3.3" />
<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
<PackageReference Include="TouchSocket" Version="2.1.9" />
<PackageReference Include="TouchSocket.Hosting" Version="2.1.9" />
<PackageReference Include="TouchSocket" Version="3.0.19" />
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />

View File

@ -7,10 +7,10 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Plugins
{
public partial class TcpCloseMonitor(ILogger<TcpCloseMonitor> logger) : PluginBase
public partial class TcpCloseMonitor(ILogger<TcpCloseMonitor> logger) : PluginBase, ITcpReceivedPlugin
{
[GeneratorPlugin(typeof(ITcpReceivedPlugin))]
public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
try
{
@ -19,21 +19,21 @@ namespace JiShe.CollectBus.Plugins
catch (CloseException ex)
{
logger.LogInformation("拦截到CloseException");
client.Close(ex.Message);
await client.CloseAsync(ex.Message);
}
catch (Exception exx)
{
// ignored
}
finally
{
}
}
}
public partial class UdpCloseMonitor(ILogger<TcpCloseMonitor> logger) : PluginBase
public partial class UdpCloseMonitor(ILogger<TcpCloseMonitor> logger) : PluginBase, IUdpReceivedPlugin
{
[GeneratorPlugin(typeof(IUdpReceivedPlugin))]
public Task OnUdpReceived(IUdpSessionBase client, UdpReceivedDataEventArgs e)
{
throw new NotImplementedException();

View File

@ -5,9 +5,8 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Plugins
{
public partial class ServerMonitor(ILogger<ServerMonitor> logger) : PluginBase
public partial class ServerMonitor(ILogger<ServerMonitor> logger) : PluginBase, IServerStartedPlugin, IServerStopedPlugin
{
[GeneratorPlugin(typeof(IServerStartedPlugin))]
public Task OnServerStarted(IServiceBase sender, ServiceStateEventArgs e)
{
switch (sender)
@ -32,7 +31,6 @@ namespace JiShe.CollectBus.Plugins
return e.InvokeNext();
}
[GeneratorPlugin(typeof(IServerStopedPlugin))]
public Task OnServerStoped(IServiceBase sender,ServiceStateEventArgs e)
{
logger.LogInformation("服务已停止");

View File

@ -1,5 +1,7 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Enums;
@ -20,7 +22,7 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
private readonly ICapPublisher _capBus;
private readonly ILogger<TcpMonitor> _logger;
@ -34,9 +36,9 @@ namespace JiShe.CollectBus.Plugins
/// <param name="logger"></param>
/// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param>
public TcpMonitor(ICapPublisher capBus,
ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
public TcpMonitor(ICapPublisher capBus,
ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache)
{
_capBus = capBus;
@ -45,8 +47,7 @@ namespace JiShe.CollectBus.Plugins
_ammeterInfoCache = ammeterInfoCache;
}
[GeneratorPlugin(typeof(ITcpReceivedPlugin))]
public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
var hexStringList = messageHexString.StringToPairs();
@ -55,15 +56,21 @@ namespace JiShe.CollectBus.Plugins
var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1))
{
var tcpSessionClient = (ITcpSessionClient)client;
if ((AFN)aFn == AFN.)
{
switch (fn)
{
case 1:
await OnTcpLoginReceived(client, messageHexString, aTuple.Item1);
await OnTcpLoginReceived(tcpSessionClient, messageHexString, aTuple.Item1);
break;
case 3:
await OnTcpHeartbeatReceived(client, messageHexString, aTuple.Item1);
//心跳帧有两种情况:
//1. 集中器先有登录帧,再有心跳帧
//2. 集中器没有登录帧,只有心跳帧
await OnTcpHeartbeatReceived(tcpSessionClient, messageHexString, aTuple.Item1);
break;
default:
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
@ -72,7 +79,7 @@ namespace JiShe.CollectBus.Plugins
}
else
{
await OnTcpNormalReceived(client, messageHexString, aTuple.Item1);
await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1);
}
}
else
@ -83,24 +90,31 @@ namespace JiShe.CollectBus.Plugins
await e.InvokeNext();
}
[GeneratorPlugin(typeof(ITcpConnectingPlugin))]
public async Task OnTcpConnecting(ITcpSessionClient client, ConnectingEventArgs e)
//[GeneratorPlugin(typeof(ITcpConnectingPlugin))]
public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e)
{
_logger.LogInformation($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}正在连接中...");
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}正在连接中...");
await e.InvokeNext();
}
[GeneratorPlugin(typeof(ITcpConnectedPlugin))]
public async Task OnTcpConnected(ITcpSessionClient client, ConnectedEventArgs e)
//[GeneratorPlugin(typeof(ITcpConnectedPlugin))]
public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e)
{
_logger.LogInformation($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}已连接");
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已连接");
await e.InvokeNext();
}
[GeneratorPlugin(typeof(ITcpClosedPlugin))]
public async Task OnTcpClosed(ITcpSessionClient client, ClosedEventArgs e)
//[GeneratorPlugin(typeof(ITcpClosedPlugin))]//ITcpSessionClient
public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e)
{
var entity = await _deviceRepository.FindAsync(a=>a.ClientId == client.Id);
var tcpSessionClient = (ITcpSessionClient)client;
var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id);
if (entity != null)
{
entity.UpdateByOnClosed();
@ -108,7 +122,7 @@ namespace JiShe.CollectBus.Plugins
}
else
{
_logger.LogWarning($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
}
await e.InvokeNext();
@ -123,9 +137,24 @@ namespace JiShe.CollectBus.Plugins
/// <returns></returns>
private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(deviceNo);
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedLoginEvent = new MessageReceivedLogin
{
ClientId = client.Id,
ClientId = deviceNo,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
@ -133,23 +162,36 @@ namespace JiShe.CollectBus.Plugins
MessageId = NewId.NextGuid().ToString()
};
await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent);
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(deviceNo, client.Id,DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(client.Id);
await _deviceRepository.UpdateAsync(entity);
}
}
private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
string clientId = deviceNo;
string oldClientId = $"{client.Id}";
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
if (clientId != oldClientId)
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
}
else
{
entity.UpdateByLoginAndHeartbeat();
}
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
ClientId = client.Id,
ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
@ -157,16 +199,6 @@ namespace JiShe.CollectBus.Plugins
MessageId = NewId.NextGuid().ToString()
};
await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent);
var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(deviceNo, client.Id, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(client.Id);
await _deviceRepository.UpdateAsync(entity);
}
}
private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo)

View File

@ -5,9 +5,8 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Plugins
{
public partial class UdpMonitor : PluginBase
public partial class UdpMonitor : PluginBase, IUdpReceivedPlugin
{
[GeneratorPlugin(typeof(IUdpReceivedPlugin))]
public async Task OnUdpReceived(IUdpSessionBase client, UdpReceivedDataEventArgs e)
{
var udpSession = client as UdpSession;

View File

@ -200,7 +200,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var ammeter in item)
{
//处理ItemCode
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
var itemArr = ammeter.DataTypes.Split(',').ToList();

View File

@ -70,44 +70,48 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
Baudrate = 2400,
FocusAddress = "402440506",
Name = "三相电表",
FocusID = 1,
Name = "张家祠工务(三相电表)",
FocusID = 95780,
DatabaseBusiID = 1,
MeteringCode = 2,
MeteringCode = 1,
AmmerterAddress = "402410040506",
ID = 9980,
ID = 127035,
TypeName = 3,
DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
TimeDensity = 15,
});
ammeterInfos.Add(new AmmeterInfo()
{
Baudrate = 2400,
FocusAddress = "542400504",
Name = "单相电表",
FocusID = 1,
Name = "五号配(长芦二所四排)(单相电表)",
FocusID = 69280,
DatabaseBusiID = 1,
MeteringCode = 2,
AmmerterAddress = "542410000504",
ID = 9981,
ID = 95594,
TypeName = 1,
DataTypes = "581,589,592,597,601",
TimeDensity = 15,
});
return ammeterInfos;
//string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
// FROM TB_GatherInfo(NOLOCK) AS A
// INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
// INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
// INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
// WHERE 1=1 and C.Special = 0 ";
////TODO 记得移除特殊表过滤
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
//if (!string.IsNullOrWhiteSpace(gatherCode))
//{
// sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
//}
//return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
// .Ado
// .QueryAsync<AmmeterInfo>(sql);
if (!string.IsNullOrWhiteSpace(gatherCode))
{
sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
}
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync<AmmeterInfo>(sql);
}
/// <summary>

View File

@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
@ -62,6 +63,8 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.SubscriberIssuedEventName)]
public async Task IssuedEvent(IssuedEventMessage issuedEventMessage)
{
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 回复下发内容IssuedEvent:{issuedEventMessage.MessageId}");
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@ -82,11 +85,14 @@ namespace JiShe.CollectBus.Subscribers
default:
throw new ArgumentOutOfRangeException();
}
var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
if (device != null)
{
await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
}
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null)
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
}
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]

View File

@ -64,6 +64,12 @@ namespace JiShe.CollectBus.IotSystems.Devices
Status = DeviceStatus.Online;
}
public void UpdateByLoginAndHeartbeat()
{
LastOnlineTime = DateTime.Now;
Status = DeviceStatus.Online;
}
public void UpdateByOnClosed()
{
LastOfflineTime = DateTime.Now;

View File

@ -25,8 +25,8 @@
<PackageReference Include="DotNetCore.CAP.Kafka" Version="8.3.1" />
<PackageReference Include="DotNetCore.CAP.MongoDB" Version="8.3.1" />
<PackageReference Include="Hangfire.Redis.StackExchange" Version="1.9.4" />
<PackageReference Include="MassTransit" Version="8.3.2" />
<PackageReference Include="MassTransit.Kafka" Version="8.3.2" />
<PackageReference Include="MassTransit" Version="8.4.0" />
<PackageReference Include="MassTransit.Kafka" Version="8.4.0" />
<PackageReference Include="Microsoft.AspNetCore.DataProtection.StackExchangeRedis" Version="9.0.0" />
<PackageReference Include="Serilog" Version="4.1.0" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.3" />

View File

@ -5,11 +5,11 @@
"Serilog.Sinks.File"
],
"MinimumLevel": {
"Default": "Warning",
"Default": "Information",
"Override": {
"Microsoft": "Information",
"Microsoft": "Warning",
"Volo.Abp": "Warning",
"Hangfire": "Information",
"Hangfire": "Warning",
"DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning",

View File

@ -602,7 +602,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
/// <param name="messageReceived"></param>
/// <param name="sendAction"></param>
/// <returns></returns>
public virtual TB3761FN AnalyzeReadingDataAsync(MessageReceived messageReceived,
public virtual TB3761 AnalyzeReadingDataAsync(MessageReceived messageReceived,
Action<byte[]>? sendAction = null)
{
var hexStringList = messageReceived.MessageHexString.StringToPairs();
@ -664,7 +664,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
}
}
return tb3761Fn;
return tb3761;
}
/// <summary>
@ -673,7 +673,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
/// <param name="messageReceived"></param>
/// <param name="sendAction"></param>
/// <returns></returns>
public virtual TB3761FN AnalyzeReadingTdcDataAsync(MessageReceived messageReceived,
public virtual TB3761 AnalyzeReadingTdcDataAsync(MessageReceived messageReceived,
Action<byte[]>? sendAction = null)
{
@ -717,7 +717,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
}
}
return tb3761Fn;
return tb3761;
//var freezeDensity = (FreezeDensity)Convert.ToInt32(hexDatas.Skip(5).Take(1));
//var addMinute = 0;
//switch (freezeDensity)