diff --git a/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj b/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj
index 77edadd..58d0070 100644
--- a/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj
+++ b/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj
@@ -1,13 +1,14 @@
-
- net8.0
- enable
- enable
-
+
+ net8.0
+ enable
+ enable
+ preview
+
-
-
-
+
+
+
diff --git a/JiShe.CollectBus.Common/Extensions/DependencyInjections/IScopedDependency.cs b/JiShe.CollectBus.Common/Extensions/DependencyInjections/IScopedDependency.cs
new file mode 100644
index 0000000..2b4fe1c
--- /dev/null
+++ b/JiShe.CollectBus.Common/Extensions/DependencyInjections/IScopedDependency.cs
@@ -0,0 +1,6 @@
+namespace JiShe.CollectBus.Common.Extensions.DependencyInjections
+{
+ public interface IScopedDependency
+ {
+ }
+}
diff --git a/JiShe.CollectBus.Common/Extensions/DependencyInjections/ISingletonDependency.cs b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ISingletonDependency.cs
new file mode 100644
index 0000000..c3f89fa
--- /dev/null
+++ b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ISingletonDependency.cs
@@ -0,0 +1,6 @@
+namespace JiShe.CollectBus.Common.Extensions.DependencyInjections
+{
+ public interface ISingletonDependency
+ {
+ }
+}
diff --git a/JiShe.CollectBus.Common/Extensions/DependencyInjections/ITransientDependency.cs b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ITransientDependency.cs
new file mode 100644
index 0000000..4c2092c
--- /dev/null
+++ b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ITransientDependency.cs
@@ -0,0 +1,6 @@
+namespace JiShe.CollectBus.Common.Extensions.DependencyInjections
+{
+ public interface ITransientDependency
+ {
+ }
+}
diff --git a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj
index 626aa63..4dbdb38 100644
--- a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj
+++ b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj
@@ -1,12 +1,14 @@
-
- netstandard2.1
- enable
-
+
+ netstandard2.1
+ enable
+ preview
+
-
-
-
+
+
+
+
diff --git a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs
index 482bdc7..f2c5686 100644
--- a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs
+++ b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs
@@ -1,6 +1,7 @@
-using System.Reflection;
+using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using JiShe.CollectBus.Protocol.Contracts.Attributes;
-using JiShe.CollectBus.Protocol.Contracts.DependencyInjection;
+using System.Diagnostics.CodeAnalysis;
+using System.Reflection;
using TouchSocket.Core;
// ReSharper disable once CheckNamespace
@@ -8,16 +9,26 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class DependencyInjectionExtensions
{
- public static void ServiceRegister(this IServiceCollection services, params Assembly[] assemblies)
+ public static void ServiceRegister(this IServiceCollection services)
{
- if (assemblies.Length <= 0)
- {
- assemblies = [Assembly.GetExecutingAssembly()];
- }
+ var assemblies = GetBinAssemblies();
foreach (var assembly in assemblies)
{
var allTypes = assembly.GetTypes();
+ foreach (var type in allTypes)
+ {
+ if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
+ {
+ var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency"));
+ foreach (var interfaceType in interfaceTypes)
+ {
+ var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Singleton);
+ services.Add(serviceDescriptor);
+ }
+ }
+ }
+
foreach (var type in allTypes)
{
if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
@@ -25,14 +36,38 @@ namespace Microsoft.Extensions.DependencyInjection
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency"));
foreach (var interfaceType in interfaceTypes)
{
- var attr = type.GetCustomAttribute();
- if (attr == null) continue;
- var serviceDescriptor = new ServiceDescriptor(interfaceType, attr.Name, type, ServiceLifetime.Transient);
+ var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Transient);
services.Add(serviceDescriptor);
}
}
}
+ foreach (var type in allTypes)
+ {
+ if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
+ {
+ var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency"));
+ foreach (var interfaceType in interfaceTypes)
+ {
+ var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Scoped);
+ services.Add(serviceDescriptor);
+ }
+ }
+ }
+ }
+ }
+
+ public static void PluginServiceRegister(this IServiceCollection services, string pluginPath = "")
+ {
+ if (pluginPath.IsNullOrWhiteSpace())
+ {
+ pluginPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins");
+ }
+ var assemblies = GetAssembliesFromFolder(pluginPath);
+
+ foreach (var assembly in assemblies)
+ {
+ var allTypes = assembly.GetTypes();
foreach (var type in allTypes)
{
if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
@@ -48,6 +83,21 @@ namespace Microsoft.Extensions.DependencyInjection
}
}
+ foreach (var type in allTypes)
+ {
+ if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
+ {
+ var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency"));
+ foreach (var interfaceType in interfaceTypes)
+ {
+ var attr = type.GetCustomAttribute();
+ if (attr == null) continue;
+ var serviceDescriptor = new ServiceDescriptor(interfaceType, attr.Name, type, ServiceLifetime.Transient);
+ services.Add(serviceDescriptor);
+ }
+ }
+ }
+
foreach (var type in allTypes)
{
if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
@@ -65,20 +115,10 @@ namespace Microsoft.Extensions.DependencyInjection
}
}
- public static void PluginServiceRegister(this IServiceCollection services, string pluginPath = "")
- {
- if (pluginPath.IsNullOrWhiteSpace())
- {
- pluginPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins");
- }
- var assemblies = GetAssembliesFromFolder(pluginPath);
- ServiceRegister(services, assemblies.ToArray());
- }
-
private static IEnumerable GetAssembliesFromFolder(string folderPath)
{
var directory = new DirectoryInfo(folderPath);
- if (!directory.Exists) return Enumerable.Empty();
+ if (!directory.Exists) return [];
var files = directory.GetFiles("*.dll");
@@ -98,5 +138,29 @@ namespace Microsoft.Extensions.DependencyInjection
return assemblies;
}
+
+ private static IEnumerable GetBinAssemblies()
+ {
+ var directory = new DirectoryInfo(AppDomain.CurrentDomain.BaseDirectory);
+ if (!directory.Exists) return [];
+
+ var files = directory.GetFiles("JiShe.CollectBus.*.dll");
+
+ var assemblies = new List();
+ foreach (var file in files)
+ {
+ try
+ {
+ var assembly = Assembly.LoadFrom(file.FullName);
+ assemblies.Add(assembly);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Error loading assembly from {file.FullName}: {ex.Message}");
+ }
+ }
+
+ return assemblies;
+ }
}
}
diff --git a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs
index b39c6f3..7264af8 100644
--- a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs
+++ b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs
@@ -30,14 +30,6 @@ namespace Microsoft.Extensions.DependencyInjection
})
.ConfigurePlugins(a =>
{
- a.UseCheckClear()
- .SetCheckClearType(CheckClearType.All)
- .SetTick(TimeSpan.FromSeconds(60))
- .SetOnClose((c, t) =>
- {
- c.TryShutdown();
- c.SafeClose("超时无数据");
- });
a.Add();
a.Add();
a.Add();
diff --git a/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj b/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj
index aa9636a..237a69c 100644
--- a/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj
+++ b/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj
@@ -27,13 +27,16 @@
+
+
+
diff --git a/JiShe.CollectBus.Console/Program.cs b/JiShe.CollectBus.Console/Program.cs
index c26b9fa..61e58ff 100644
--- a/JiShe.CollectBus.Console/Program.cs
+++ b/JiShe.CollectBus.Console/Program.cs
@@ -33,6 +33,7 @@ namespace JiShe.CollectBus.Console
services.PluginServiceRegister();
services.AddTcp(configuration);
services.AddUdp(configuration);
+ services.AddMassTransit(configuration);
services.AddStackExchangeRedisCache(options =>
{
options.Configuration = configuration["RedisCache:ConnectionString"];
diff --git a/JiShe.CollectBus.Console/appsettings.json b/JiShe.CollectBus.Console/appsettings.json
index 27b20f5..ea483c5 100644
--- a/JiShe.CollectBus.Console/appsettings.json
+++ b/JiShe.CollectBus.Console/appsettings.json
@@ -13,5 +13,16 @@
"RedisCache": {
"ConnectionString": "123456@qwer@localhost:6379",
"InstanceName": "CollectBus"
+ },
+ "MQ": {
+ "Host": "118.190.144.92",
+ "Port": "5672",
+ "VirtualHost": "/",
+ "UserName": "collectbus",
+ "Password": "123456",
+ "Queue": {
+ "Push": "PUSH_COMMANDS",
+ "Report": "REPORT_COMMAND"
+ }
}
}
\ No newline at end of file
diff --git a/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj b/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj
index 9286c2d..a893ed6 100644
--- a/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj
+++ b/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj
@@ -1,25 +1,28 @@
-
- net8.0
- enable
- enable
-
+
+ net8.0
+ enable
+ enable
+ preview
-
-
-
-
-
+
-
-
-
-
+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
diff --git a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs
index 9e42266..6213069 100644
--- a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs
+++ b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs
@@ -1,28 +1,40 @@
-using Microsoft.Extensions.DependencyInjection;
-using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using JiShe.CollectBus.RabbitMQ.Models;
+using JiShe.CollectBus.RabbitMQ.Senders;
using TouchSocket.Core;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Core.Plugins
{
- public partial class TcpServiceReceivedPlugin(IServiceProvider serviceProvider) : PluginBase
+ public partial class TcpServiceReceivedPlugin(IMqSender sender) : PluginBase
{
[GeneratorPlugin(typeof(ITcpReceivedPlugin))]
public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
{
- //TODO: 电表主站到集中器的协议都是376.1协议,集中器下发到电表协议分为645-07和modbus
- //TODO: 水表主站到集中器的协议分为118和645-97协议
- //TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送
+ ////TODO: 电表主站到集中器的协议都是376.1协议,集中器下发到电表协议分为645-07和modbus
+ ////TODO: 水表主站到集中器的协议分为118和645-97协议
+ ////TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送
- const string protocolType = "StandardProtocol";
+ //const string protocolType = "StandardProtocol";
- var protocolPlugin = serviceProvider.GetKeyedService(protocolType);
- client.Logger.Info($"{protocolPlugin?.Get().Name},{protocolPlugin?.Get().RegularExpression}");
- //从客户端收到信息
- var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
- client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}");
+ //var protocolPlugin = serviceProvider.GetKeyedService(protocolType);
+ //var protocolPluginInfo = await protocolPlugin.GetAsync();
+ //client.Logger.Info($"{protocolPluginInfo.Name},{protocolPluginInfo.RegularExpression}");
+ ////从客户端收到信息
+ //var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
+ //client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}");
+
+ //await protocolPlugin.ReceivedAsync(client,e);
+
+
+ await sender.SendToReportAsync(new ReportDto
+ {
+ ClientIP = client.IP,
+ ClientId = client.Id,
+ Port = client.Port,
+ MessageHexString = Convert.ToHexString(e.ByteBlock.Span),
+ DeviceNo = "111"
+ });
- protocolPlugin?.Received(client,e);
await e.InvokeNext();
}
diff --git a/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj b/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj
index a5d38eb..6764619 100644
--- a/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj
+++ b/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj
@@ -1,23 +1,25 @@
-
- net8.0
- enable
- enable
-
+
+ net8.0
+ enable
+ enable
+ preview
-
-
-
- all
- runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
-
+
-
-
-
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+
diff --git a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index 33e823b..383d308 100644
--- a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -1,4 +1,5 @@
using System;
+using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.Caching.Distributed;
@@ -6,32 +7,33 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
- public abstract class BaseProtocolPlugin:IProtocolPlugin
+ public abstract class BaseProtocolPlugin(IDistributedCache cache) : IProtocolPlugin
{
- private readonly IDistributedCache _cache;
+ public abstract Task GetAsync();
- protected BaseProtocolPlugin(IDistributedCache cache)
+ public virtual Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e)
{
- _cache = cache;
+ return Task.CompletedTask;
}
- public abstract ProtocolInfo Get();
-
- public abstract void Received(ITcpSessionClient client, ReceivedDataEventArgs e);
-
- public abstract void Send();
-
- public void Load()
+ public virtual Task SendAsync()
{
- var result = _cache.GetString("myKey");
+ return Task.CompletedTask;
+ }
+
+ public virtual async Task LoadAsync()
+ {
+ var result = cache.GetString("myKey");
if (result == null)
{
result = "Calculated Data";
- _cache.SetString("myKey", result, new DistributedCacheEntryOptions
+ cache.SetString("myKey", result, new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10)
});
}
+
+ await Task.CompletedTask;
}
}
diff --git a/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs b/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs
index 93aa9bf..a04af71 100644
--- a/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs
+++ b/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs
@@ -1,17 +1,10 @@
using System;
-using System.Collections.Generic;
-using System.Text;
namespace JiShe.CollectBus.Protocol.Contracts.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
- public class ProtocolNameAttribute: Attribute
+ public class ProtocolNameAttribute(string name) : Attribute
{
- public ProtocolNameAttribute(string name)
- {
- Name = name;
- }
-
- public string Name { get; set; }
+ public string Name { get; set; } = name;
}
}
diff --git a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/IScopedDependency.cs b/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/IScopedDependency.cs
deleted file mode 100644
index 8d5bd50..0000000
--- a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/IScopedDependency.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection
-{
- public interface IScopedDependency
- {
- }
-}
diff --git a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ISingletonDependency.cs b/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ISingletonDependency.cs
deleted file mode 100644
index 411e68d..0000000
--- a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ISingletonDependency.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection
-{
- public interface ISingletonDependency
- {
- }
-}
diff --git a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ITransientDependency.cs b/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ITransientDependency.cs
deleted file mode 100644
index e43fe49..0000000
--- a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ITransientDependency.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection
-{
- public interface ITransientDependency
- {
- }
-}
diff --git a/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index 54bb050..1c61344 100644
--- a/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -1,16 +1,17 @@
-using JiShe.CollectBus.Protocol.Contracts.Models;
+using System.Threading.Tasks;
+using JiShe.CollectBus.Protocol.Contracts.Models;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{
public interface IProtocolPlugin
{
- ProtocolInfo Get();
+ Task GetAsync();
- void Load();
+ Task LoadAsync();
- void Received(ITcpSessionClient client, ReceivedDataEventArgs e);
+ Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e);
- void Send();
+ Task SendAsync();
}
}
diff --git a/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
index 79afffb..2dba74a 100644
--- a/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
+++ b/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
@@ -3,15 +3,17 @@
netstandard2.1
enable
+ preview
+
-
+
diff --git a/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj b/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj
index f15ccdf..8160934 100644
--- a/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj
+++ b/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj
@@ -1,17 +1,18 @@
-
- net8.0
- enable
- enable
-
+
+ net8.0
+ enable
+ enable
+ preview
+
-
-
-
+
+
+
-
-
-
+
+
+
diff --git a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
index f3c0870..a6f3b54 100644
--- a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
+++ b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
@@ -1,6 +1,6 @@
-using JiShe.CollectBus.Protocol.Contracts.Abstracts;
+using JiShe.CollectBus.Common.Extensions.DependencyInjections;
+using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Attributes;
-using JiShe.CollectBus.Protocol.Contracts.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.Caching.Distributed;
using TouchSocket.Sockets;
@@ -10,92 +10,19 @@ namespace JiShe.CollectBus.Protocol.Test
[ProtocolName("TestProtocol")]
public class TestProtocolPlugin(IDistributedCache cache) : BaseProtocolPlugin(cache), ISingletonDependency
{
- public override ProtocolInfo Get()
- {
- return new ProtocolInfo("Test", "376.1", "TCP", "376.1协议", "DTSU1980");
- }
-
- public override void Received(ITcpSessionClient client, ReceivedDataEventArgs e)
+ public override Task GetAsync()
{
throw new NotImplementedException();
}
- public override void Send()
+ public override Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e)
{
throw new NotImplementedException();
}
- //档案下发
- //var listMeter = new List() { new MeterParameter(){
- // Pn = 1,
- // BaudRate = 3,
- // Port = 2,
- // ProtocolType = ProtocolType.DLT6452007,
- // Address = "312408006642",
- // Password = "000000",
- // RateNumber = 4,
- // IntegerBitNumber = 4,
- // DecimalBitNumber = 4,
- // CollectorAddress = "000000000000",
- // UserCategoryNumber = 0,
- // UserSubclassNumber = 0
- //} };
- //new BuildCommand().GetAFN04F10DataUnit(new ReqParameter2() {
- // AFN = AFN.设置参数,
- // CMasterStationFunCode = CMasterStationFunCode.请求1级数据,
- // A= "322009872",
- // Seq = new Seq()
- // {
- // TpV = TpV.附加信息域中无时间标签,
- // FIRFIN = FIRFIN.单帧,
- // CON = CON.需要对该帧进行确认,
- // PRSEQ = 10,
- // },
- // MSA = 13,
- // Pn = 0,
- // Fn = 10
- //},listMeter);
- //档案读取
- //new BuildCommand().GetAFN10F10DataUnit(new ReqParameter2()
- //{
- // AFN = AFN.查询参数,
- // CMasterStationFunCode = CMasterStationFunCode.请求2级数据,
- // A = "322009872",
- // Seq = new Seq()
- // {
- // TpV = TpV.附加信息域中无时间标签,
- // FIRFIN = FIRFIN.单帧,
- // CON = CON.不需要对该帧进行确认,
- // PRSEQ = 2,
- // },
- // MSA = 13,
- // Pn = 0,
- // Fn = 10
- //},new List() {1,2 });
-
- //var str = "68A600A6006888203290261A0A6200000201010001000100621E426622082431000000000000040300000000000000CA16";
- //var cmdResult = new BuildCommand().AnalysisCmd(str);
- //if(cmdResult != null)
- //{
- // var list = new BuildCommand().AnalysisAFN04F10DataUnit(cmdResult.HexDatas);
- //}
- //new BuildCommand().GetCommandBytes(new ReqParameter2()
- //{
- // AFN = AFN.请求实时数据,
- // CMasterStationFunCode = CMasterStationFunCode.请求2级数据,
- // A = "322009872",
- // Seq = new Seq()
- // {
- // TpV = TpV.附加信息域中无时间标签,
- // FIRFIN = FIRFIN.单帧,
- // CON = CON.不需要对该帧进行确认,
- // PRSEQ = 2,
- // },
- // MSA = 13,
- // Pn = 1,
- // Fn = 129
- // });
-
- //new BuildCommand().AmmeterValveControl("312408006642", "", "000000", true);
+ public override Task SendAsync()
+ {
+ throw new NotImplementedException();
+ }
}
}
diff --git a/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj b/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj
index 708f4a0..b54c79a 100644
--- a/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj
+++ b/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj
@@ -1,23 +1,26 @@
-
- net8.0
- enable
- enable
-
-
+
+ net8.0
+ enable
+ enable
+
+ preview
-
-
-
+
-
-
-
-
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
diff --git a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
index 909c710..1aae7c0 100644
--- a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
+++ b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
@@ -1,7 +1,7 @@
using JiShe.CollectBus.Common;
+using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Attributes;
-using JiShe.CollectBus.Protocol.Contracts.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.Caching.Distributed;
using TouchSocket.Sockets;
@@ -37,17 +37,18 @@ namespace JiShe.CollectBus.Protocol
}
- public override ProtocolInfo Get()
+ public override async Task GetAsync()
{
- return new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980");
+ var info = new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980");
+ return await Task.FromResult(info);
}
- public new void Load()
+ public new async Task LoadAsync()
{
- base.Load();
+ await base.LoadAsync();
}
- public override void Received(ITcpSessionClient client, ReceivedDataEventArgs e)
+ public override async Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e)
{
tcpSessionClient = client;
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
@@ -57,15 +58,16 @@ namespace JiShe.CollectBus.Protocol
return;
}
AnalysisData(cmdResult);
+ await Task.CompletedTask;
}
- public override void Send()
+ public override async Task SendAsync()
{
- throw new NotImplementedException();
+ await Task.CompletedTask;
}
///
- /// Gets the msa.
+ /// Gets the msa.
///
/// The mark.
///
@@ -293,6 +295,7 @@ namespace JiShe.CollectBus.Protocol
};
commandReulst.ReplyBytes = GetCommandBytes(reqParam);
tcpSessionClient.SendAsync(tcpSessionClient.Id,commandReulst.ReplyBytes);
+
}
else if (commandReulst.Fn == 2)//退出登录
diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs
new file mode 100644
index 0000000..481b54b
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs
@@ -0,0 +1,13 @@
+using MassTransit;
+using JiShe.CollectBus.RabbitMQ.Models;
+
+namespace JiShe.CollectBus.RabbitMQ.Consumers
+{
+ public class PushConsumer : IConsumer
+ {
+ public async Task Consume(ConsumeContext context)
+ {
+ Console.WriteLine(context.Message.ClientId);
+ }
+ }
+}
diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs
new file mode 100644
index 0000000..e39a18e
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs
@@ -0,0 +1,13 @@
+using JiShe.CollectBus.RabbitMQ.Models;
+using MassTransit;
+
+namespace JiShe.CollectBus.RabbitMQ.Consumers
+{
+ internal class ReportConsumer : IConsumer
+ {
+ public async Task Consume(ConsumeContext context)
+ {
+ Console.WriteLine(context.Message.ClientId);
+ }
+ }
+}
diff --git a/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs
new file mode 100644
index 0000000..dee01ae
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs
@@ -0,0 +1,54 @@
+using JiShe.CollectBus.RabbitMQ.Consumers;
+using MassTransit;
+using Microsoft.Extensions.Configuration;
+using RabbitMQ.Client;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class ServiceCollectionExtensions
+ {
+ public static IServiceCollection AddMassTransit(this IServiceCollection services, IConfiguration configuration)
+ {
+ services.AddMassTransit(x =>
+ {
+ x.UsingRabbitMq((context, cfg) =>
+ {
+ cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
+ {
+ h.Username(configuration["MQ:UserName"] ?? string.Empty);
+ h.Password(configuration["MQ:Password"] ?? string.Empty);
+ });
+
+ cfg.ReceiveEndpoint(configuration["MQ:Queue:Push"] ?? string.Empty, x =>
+ {
+ x.ConfigureConsumeTopology = false;
+
+ x.Consumer();
+
+ x.Bind("commands", s =>
+ {
+ s.RoutingKey = configuration["MQ:Queue:Push"];
+ s.ExchangeType = ExchangeType.Direct;
+ });
+ });
+
+ cfg.ReceiveEndpoint(configuration["MQ:Queue:Report"] ?? string.Empty, x =>
+ {
+ x.ConfigureConsumeTopology = false;
+
+ x.Consumer();
+
+ x.Bind("commands", s =>
+ {
+ s.RoutingKey = configuration["MQ:Queue:Report"];
+ s.ExchangeType = ExchangeType.Direct;
+ });
+ });
+ });
+ });
+ services.AddMassTransitHostedService();
+ return services;
+ }
+ }
+}
diff --git a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj
new file mode 100644
index 0000000..9aa5847
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj
@@ -0,0 +1,19 @@
+
+
+
+ net8.0
+ enable
+ enable
+ preview
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs b/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs
new file mode 100644
index 0000000..d203020
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.RabbitMQ.Models
+{
+ public class PushDto
+ {
+ public int ClientId { get; set; }
+ public string Message { get; set; }
+ public string DeviceNo { get; set; }
+ }
+}
diff --git a/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs b/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs
new file mode 100644
index 0000000..1ea7c5f
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.RabbitMQ.Models
+{
+ public class ReportDto
+ {
+ ///
+ /// 客服端标识
+ ///
+ public string ClientId { get; set; }
+
+ ///
+ /// 客服端IP
+ ///
+ public string ClientIP { get; set; }
+
+ ///
+ /// 客服端端口
+ ///
+ public int Port { get; set; }
+
+ ///
+ /// 客服端报文
+ ///
+ public string MessageHexString { get; set; }
+
+ ///
+ /// 设备号
+ ///
+ public string DeviceNo { get; set; }
+ }
+}
diff --git a/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs
new file mode 100644
index 0000000..6cedf7d
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using JiShe.CollectBus.Common.Extensions.DependencyInjections;
+
+namespace JiShe.CollectBus.RabbitMQ.Senders
+{
+ public interface IMqSender : ISingletonDependency
+ {
+ Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class;
+ Task SendToPushAsync(object message, CancellationToken cancellationToken = default);
+ Task SendToReportAsync(T message, CancellationToken cancellationToken = default) where T : class;
+ Task SendToReportAsync(object message, CancellationToken cancellationToken = default);
+ Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs
new file mode 100644
index 0000000..7b21d0a
--- /dev/null
+++ b/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs
@@ -0,0 +1,49 @@
+using JiShe.CollectBus.Common.Extensions.DependencyInjections;
+using MassTransit;
+using Microsoft.Extensions.Configuration;
+
+namespace JiShe.CollectBus.RabbitMQ.Senders
+{
+ public class MqSender : IMqSender
+ {
+ private readonly ISendEndpointProvider _sendEndpointProvider;
+ private readonly IConfiguration _configuration;
+
+ public MqSender(ISendEndpointProvider sendEndpointProvider, IConfiguration configuration)
+ {
+ _sendEndpointProvider = sendEndpointProvider;
+ _configuration = configuration;
+ }
+
+ public async Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class
+ {
+ var queueName = _configuration["MQ:Queue:Push"];
+ await SendAsync(queueName, message, cancellationToken);
+ }
+
+ public async Task SendToPushAsync(object message, CancellationToken cancellationToken = default)
+ {
+ var queueName = _configuration["MQ:Queue:Push"];
+ await SendAsync(queueName, message, cancellationToken);
+ }
+
+ public async Task SendToReportAsync(T message, CancellationToken cancellationToken = default) where T : class
+ {
+ var queueName = _configuration["MQ:Queue:Report"];
+ await SendAsync(queueName, message, cancellationToken);
+ }
+
+ public async Task SendToReportAsync(object message, CancellationToken cancellationToken = default)
+ {
+ var queueName = _configuration["MQ:Queue:Report"];
+ await SendAsync(queueName, message, cancellationToken);
+ }
+
+
+ public async Task SendAsync(string queueKey,object message, CancellationToken cancellationToken = default)
+ {
+ var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}"));
+ await endpoint.Send(message, cancellationToken);
+ }
+ }
+}
diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index ece59cf..0832b98 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -21,7 +21,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.T
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.EntityFrameworkCore", "JiShe.CollectBus.EntityFrameworkCore\JiShe.CollectBus.EntityFrameworkCore.csproj", "{16D42BCF-EDB8-4153-B37D-0B10FB6DF36C}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.ClickHouse", "JiShe.CollectBus.ClickHouse\JiShe.CollectBus.ClickHouse.csproj", "{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.ClickHouse", "JiShe.CollectBus.ClickHouse\JiShe.CollectBus.ClickHouse.csproj", "{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.RabbitMQ", "JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj", "{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -61,6 +63,10 @@ Global
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -74,6 +80,7 @@ Global
{289196B4-FFBE-4E40-A3A1-FCFADBE945ED} = {3A04FB29-EA75-4499-BBF3-AF24C7D46A1D}
{16D42BCF-EDB8-4153-B37D-0B10FB6DF36C} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
+ {DB46D90E-304D-48B7-9ED6-F4DCC95D3824} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {33261859-9CD1-4A43-B181-AB75C247D1CD}