修改代码

This commit is contained in:
cli 2024-10-25 19:11:43 +08:00
parent ff1d0f37c6
commit 16bd93cc11
33 changed files with 503 additions and 268 deletions

View File

@ -4,6 +4,7 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>

View File

@ -0,0 +1,6 @@
namespace JiShe.CollectBus.Common.Extensions.DependencyInjections
{
public interface IScopedDependency
{
}
}

View File

@ -0,0 +1,6 @@
namespace JiShe.CollectBus.Common.Extensions.DependencyInjections
{
public interface ISingletonDependency
{
}
}

View File

@ -0,0 +1,6 @@
namespace JiShe.CollectBus.Common.Extensions.DependencyInjections
{
public interface ITransientDependency
{
}
}

View File

@ -3,10 +3,12 @@
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
</ItemGroup>
</Project>

View File

@ -1,6 +1,7 @@
using System.Reflection;
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using JiShe.CollectBus.Protocol.Contracts.Attributes;
using JiShe.CollectBus.Protocol.Contracts.DependencyInjection;
using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using TouchSocket.Core;
// ReSharper disable once CheckNamespace
@ -8,16 +9,26 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class DependencyInjectionExtensions
{
public static void ServiceRegister(this IServiceCollection services, params Assembly[] assemblies)
public static void ServiceRegister(this IServiceCollection services)
{
if (assemblies.Length <= 0)
{
assemblies = [Assembly.GetExecutingAssembly()];
}
var assemblies = GetBinAssemblies();
foreach (var assembly in assemblies)
{
var allTypes = assembly.GetTypes();
foreach (var type in allTypes)
{
if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency"));
foreach (var interfaceType in interfaceTypes)
{
var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Singleton);
services.Add(serviceDescriptor);
}
}
}
foreach (var type in allTypes)
{
if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
@ -25,14 +36,38 @@ namespace Microsoft.Extensions.DependencyInjection
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency"));
foreach (var interfaceType in interfaceTypes)
{
var attr = type.GetCustomAttribute<ProtocolNameAttribute>();
if (attr == null) continue;
var serviceDescriptor = new ServiceDescriptor(interfaceType, attr.Name, type, ServiceLifetime.Transient);
var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Transient);
services.Add(serviceDescriptor);
}
}
}
foreach (var type in allTypes)
{
if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency"));
foreach (var interfaceType in interfaceTypes)
{
var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Scoped);
services.Add(serviceDescriptor);
}
}
}
}
}
public static void PluginServiceRegister(this IServiceCollection services, string pluginPath = "")
{
if (pluginPath.IsNullOrWhiteSpace())
{
pluginPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins");
}
var assemblies = GetAssembliesFromFolder(pluginPath);
foreach (var assembly in assemblies)
{
var allTypes = assembly.GetTypes();
foreach (var type in allTypes)
{
if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
@ -48,6 +83,21 @@ namespace Microsoft.Extensions.DependencyInjection
}
}
foreach (var type in allTypes)
{
if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency"));
foreach (var interfaceType in interfaceTypes)
{
var attr = type.GetCustomAttribute<ProtocolNameAttribute>();
if (attr == null) continue;
var serviceDescriptor = new ServiceDescriptor(interfaceType, attr.Name, type, ServiceLifetime.Transient);
services.Add(serviceDescriptor);
}
}
}
foreach (var type in allTypes)
{
if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
@ -65,20 +115,10 @@ namespace Microsoft.Extensions.DependencyInjection
}
}
public static void PluginServiceRegister(this IServiceCollection services, string pluginPath = "")
{
if (pluginPath.IsNullOrWhiteSpace())
{
pluginPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins");
}
var assemblies = GetAssembliesFromFolder(pluginPath);
ServiceRegister(services, assemblies.ToArray());
}
private static IEnumerable<Assembly> GetAssembliesFromFolder(string folderPath)
{
var directory = new DirectoryInfo(folderPath);
if (!directory.Exists) return Enumerable.Empty<Assembly>();
if (!directory.Exists) return [];
var files = directory.GetFiles("*.dll");
@ -98,5 +138,29 @@ namespace Microsoft.Extensions.DependencyInjection
return assemblies;
}
private static IEnumerable<Assembly> GetBinAssemblies()
{
var directory = new DirectoryInfo(AppDomain.CurrentDomain.BaseDirectory);
if (!directory.Exists) return [];
var files = directory.GetFiles("JiShe.CollectBus.*.dll");
var assemblies = new List<Assembly>();
foreach (var file in files)
{
try
{
var assembly = Assembly.LoadFrom(file.FullName);
assemblies.Add(assembly);
}
catch (Exception ex)
{
Console.WriteLine($"Error loading assembly from {file.FullName}: {ex.Message}");
}
}
return assemblies;
}
}
}

View File

@ -30,14 +30,6 @@ namespace Microsoft.Extensions.DependencyInjection
})
.ConfigurePlugins(a =>
{
a.UseCheckClear()
.SetCheckClearType(CheckClearType.All)
.SetTick(TimeSpan.FromSeconds(60))
.SetOnClose((c, t) =>
{
c.TryShutdown();
c.SafeClose("超时无数据");
});
a.Add<TcpClosePlugin>();
a.Add<TcpServiceReceivedPlugin>();
a.Add<BusService>();

View File

@ -27,13 +27,16 @@
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="8.0.10" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.ClickHouse\JiShe.CollectBus.ClickHouse.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Core\JiShe.CollectBus.Core.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.EntityFrameworkCore\JiShe.CollectBus.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj" />
</ItemGroup>
<ItemGroup>

View File

@ -33,6 +33,7 @@ namespace JiShe.CollectBus.Console
services.PluginServiceRegister();
services.AddTcp(configuration);
services.AddUdp(configuration);
services.AddMassTransit(configuration);
services.AddStackExchangeRedisCache(options =>
{
options.Configuration = configuration["RedisCache:ConnectionString"];

View File

@ -13,5 +13,16 @@
"RedisCache": {
"ConnectionString": "123456@qwer@localhost:6379",
"InstanceName": "CollectBus"
},
"MQ": {
"Host": "118.190.144.92",
"Port": "5672",
"VirtualHost": "/",
"UserName": "collectbus",
"Password": "123456",
"Queue": {
"Push": "PUSH_COMMANDS",
"Report": "REPORT_COMMAND"
}
}
}

View File

@ -4,6 +4,8 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>
@ -20,6 +22,7 @@
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.EntityFrameworkCore\JiShe.CollectBus.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj" />
</ItemGroup>
</Project>

View File

@ -1,28 +1,40 @@
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.RabbitMQ.Models;
using JiShe.CollectBus.RabbitMQ.Senders;
using TouchSocket.Core;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Core.Plugins
{
public partial class TcpServiceReceivedPlugin(IServiceProvider serviceProvider) : PluginBase
public partial class TcpServiceReceivedPlugin(IMqSender sender) : PluginBase
{
[GeneratorPlugin(typeof(ITcpReceivedPlugin))]
public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
{
//TODO: 电表主站到集中器的协议都是376.1协议集中器下发到电表协议分为645-07和modbus
//TODO: 水表主站到集中器的协议分为118和645-97协议
//TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送
////TODO: 电表主站到集中器的协议都是376.1协议集中器下发到电表协议分为645-07和modbus
////TODO: 水表主站到集中器的协议分为118和645-97协议
////TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送
const string protocolType = "StandardProtocol";
//const string protocolType = "StandardProtocol";
var protocolPlugin = serviceProvider.GetKeyedService<IProtocolPlugin>(protocolType);
client.Logger.Info($"{protocolPlugin?.Get().Name},{protocolPlugin?.Get().RegularExpression}");
//从客户端收到信息
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}");
//var protocolPlugin = serviceProvider.GetKeyedService<IProtocolPlugin>(protocolType);
//var protocolPluginInfo = await protocolPlugin.GetAsync();
//client.Logger.Info($"{protocolPluginInfo.Name},{protocolPluginInfo.RegularExpression}");
////从客户端收到信息
//var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
//client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}");
//await protocolPlugin.ReceivedAsync(client,e);
await sender.SendToReportAsync(new ReportDto
{
ClientIP = client.IP,
ClientId = client.Id,
Port = client.Port,
MessageHexString = Convert.ToHexString(e.ByteBlock.Span),
DeviceNo = "111"
});
protocolPlugin?.Received(client,e);
await e.InvokeNext();
}

View File

@ -4,6 +4,8 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>

View File

@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.Caching.Distributed;
@ -6,32 +7,33 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
public abstract class BaseProtocolPlugin:IProtocolPlugin
public abstract class BaseProtocolPlugin(IDistributedCache cache) : IProtocolPlugin
{
private readonly IDistributedCache _cache;
public abstract Task<ProtocolInfo> GetAsync();
protected BaseProtocolPlugin(IDistributedCache cache)
public virtual Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e)
{
_cache = cache;
return Task.CompletedTask;
}
public abstract ProtocolInfo Get();
public abstract void Received(ITcpSessionClient client, ReceivedDataEventArgs e);
public abstract void Send();
public void Load()
public virtual Task SendAsync()
{
var result = _cache.GetString("myKey");
return Task.CompletedTask;
}
public virtual async Task LoadAsync()
{
var result = cache.GetString("myKey");
if (result == null)
{
result = "Calculated Data";
_cache.SetString("myKey", result, new DistributedCacheEntryOptions
cache.SetString("myKey", result, new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10)
});
}
await Task.CompletedTask;
}
}

View File

@ -1,17 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace JiShe.CollectBus.Protocol.Contracts.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
public class ProtocolNameAttribute: Attribute
public class ProtocolNameAttribute(string name) : Attribute
{
public ProtocolNameAttribute(string name)
{
Name = name;
}
public string Name { get; set; }
public string Name { get; set; } = name;
}
}

View File

@ -1,10 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection
{
public interface IScopedDependency
{
}
}

View File

@ -1,10 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection
{
public interface ISingletonDependency
{
}
}

View File

@ -1,10 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection
{
public interface ITransientDependency
{
}
}

View File

@ -1,16 +1,17 @@
using JiShe.CollectBus.Protocol.Contracts.Models;
using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Contracts.Models;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{
public interface IProtocolPlugin
{
ProtocolInfo Get();
Task<ProtocolInfo> GetAsync();
void Load();
Task LoadAsync();
void Received(ITcpSessionClient client, ReceivedDataEventArgs e);
Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e);
void Send();
Task SendAsync();
}
}

View File

@ -3,9 +3,11 @@
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="TouchSocket" Version="2.1.9" />
</ItemGroup>

View File

@ -4,6 +4,7 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>

View File

@ -1,6 +1,6 @@
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Attributes;
using JiShe.CollectBus.Protocol.Contracts.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.Caching.Distributed;
using TouchSocket.Sockets;
@ -10,92 +10,19 @@ namespace JiShe.CollectBus.Protocol.Test
[ProtocolName("TestProtocol")]
public class TestProtocolPlugin(IDistributedCache cache) : BaseProtocolPlugin(cache), ISingletonDependency
{
public override ProtocolInfo Get()
{
return new ProtocolInfo("Test", "376.1", "TCP", "376.1协议", "DTSU1980");
}
public override void Received(ITcpSessionClient client, ReceivedDataEventArgs e)
public override Task<ProtocolInfo> GetAsync()
{
throw new NotImplementedException();
}
public override void Send()
public override Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e)
{
throw new NotImplementedException();
}
//档案下发
//var listMeter = new List<MeterParameter>() { new MeterParameter(){
// Pn = 1,
// BaudRate = 3,
// Port = 2,
// ProtocolType = ProtocolType.DLT6452007,
// Address = "312408006642",
// Password = "000000",
// RateNumber = 4,
// IntegerBitNumber = 4,
// DecimalBitNumber = 4,
// CollectorAddress = "000000000000",
// UserCategoryNumber = 0,
// UserSubclassNumber = 0
//} };
//new BuildCommand().GetAFN04F10DataUnit(new ReqParameter2() {
// AFN = AFN.设置参数,
// CMasterStationFunCode = CMasterStationFunCode.请求1级数据,
// A= "322009872",
// Seq = new Seq()
// {
// TpV = TpV.附加信息域中无时间标签,
// FIRFIN = FIRFIN.单帧,
// CON = CON.需要对该帧进行确认,
// PRSEQ = 10,
// },
// MSA = 13,
// Pn = 0,
// Fn = 10
//},listMeter);
//档案读取
//new BuildCommand().GetAFN10F10DataUnit(new ReqParameter2()
//{
// AFN = AFN.查询参数,
// CMasterStationFunCode = CMasterStationFunCode.请求2级数据,
// A = "322009872",
// Seq = new Seq()
// {
// TpV = TpV.附加信息域中无时间标签,
// FIRFIN = FIRFIN.单帧,
// CON = CON.不需要对该帧进行确认,
// PRSEQ = 2,
// },
// MSA = 13,
// Pn = 0,
// Fn = 10
//},new List<int>() {1,2 });
//var str = "68A600A6006888203290261A0A6200000201010001000100621E426622082431000000000000040300000000000000CA16";
//var cmdResult = new BuildCommand().AnalysisCmd(str);
//if(cmdResult != null)
//{
// var list = new BuildCommand().AnalysisAFN04F10DataUnit(cmdResult.HexDatas);
//}
//new BuildCommand().GetCommandBytes(new ReqParameter2()
//{
// AFN = AFN.请求实时数据,
// CMasterStationFunCode = CMasterStationFunCode.请求2级数据,
// A = "322009872",
// Seq = new Seq()
// {
// TpV = TpV.附加信息域中无时间标签,
// FIRFIN = FIRFIN.单帧,
// CON = CON.不需要对该帧进行确认,
// PRSEQ = 2,
// },
// MSA = 13,
// Pn = 1,
// Fn = 129
// });
//new BuildCommand().AmmeterValveControl("312408006642", "", "000000", true);
public override Task SendAsync()
{
throw new NotImplementedException();
}
}
}

View File

@ -5,6 +5,8 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<BaseOutputPath></BaseOutputPath>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>
@ -14,6 +16,7 @@
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj" />
</ItemGroup>
<Target Name="PostBuild" AfterTargets="PostBuildEvent">

View File

@ -1,7 +1,7 @@
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Attributes;
using JiShe.CollectBus.Protocol.Contracts.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.Caching.Distributed;
using TouchSocket.Sockets;
@ -37,17 +37,18 @@ namespace JiShe.CollectBus.Protocol
}
public override ProtocolInfo Get()
public override async Task<ProtocolInfo> GetAsync()
{
return new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980");
var info = new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980");
return await Task.FromResult(info);
}
public new void Load()
public new async Task LoadAsync()
{
base.Load();
await base.LoadAsync();
}
public override void Received(ITcpSessionClient client, ReceivedDataEventArgs e)
public override async Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e)
{
tcpSessionClient = client;
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
@ -57,11 +58,12 @@ namespace JiShe.CollectBus.Protocol
return;
}
AnalysisData(cmdResult);
await Task.CompletedTask;
}
public override void Send()
public override async Task SendAsync()
{
throw new NotImplementedException();
await Task.CompletedTask;
}
/// <summary>
@ -294,6 +296,7 @@ namespace JiShe.CollectBus.Protocol
commandReulst.ReplyBytes = GetCommandBytes(reqParam);
tcpSessionClient.SendAsync(tcpSessionClient.Id,commandReulst.ReplyBytes);
}
else if (commandReulst.Fn == 2)//退出登录
{

View File

@ -0,0 +1,13 @@
using MassTransit;
using JiShe.CollectBus.RabbitMQ.Models;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class PushConsumer : IConsumer<PushDto>
{
public async Task Consume(ConsumeContext<PushDto> context)
{
Console.WriteLine(context.Message.ClientId);
}
}
}

View File

@ -0,0 +1,13 @@
using JiShe.CollectBus.RabbitMQ.Models;
using MassTransit;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
internal class ReportConsumer : IConsumer<ReportDto>
{
public async Task Consume(ConsumeContext<ReportDto> context)
{
Console.WriteLine(context.Message.ClientId);
}
}
}

View File

@ -0,0 +1,54 @@
using JiShe.CollectBus.RabbitMQ.Consumers;
using MassTransit;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddMassTransit(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
{
h.Username(configuration["MQ:UserName"] ?? string.Empty);
h.Password(configuration["MQ:Password"] ?? string.Empty);
});
cfg.ReceiveEndpoint(configuration["MQ:Queue:Push"] ?? string.Empty, x =>
{
x.ConfigureConsumeTopology = false;
x.Consumer<PushConsumer>();
x.Bind("commands", s =>
{
s.RoutingKey = configuration["MQ:Queue:Push"];
s.ExchangeType = ExchangeType.Direct;
});
});
cfg.ReceiveEndpoint(configuration["MQ:Queue:Report"] ?? string.Empty, x =>
{
x.ConfigureConsumeTopology = false;
x.Consumer<ReportConsumer>();
x.Bind("commands", s =>
{
s.RoutingKey = configuration["MQ:Queue:Report"];
s.ExchangeType = ExchangeType.Direct;
});
});
});
});
services.AddMassTransitHostedService();
return services;
}
}
}

View File

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="8.3.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.RabbitMQ.Models
{
public class PushDto
{
public int ClientId { get; set; }
public string Message { get; set; }
public string DeviceNo { get; set; }
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.RabbitMQ.Models
{
public class ReportDto
{
/// <summary>
/// 客服端标识
/// </summary>
public string ClientId { get; set; }
/// <summary>
/// 客服端IP
/// </summary>
public string ClientIP { get; set; }
/// <summary>
/// 客服端端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 客服端报文
/// </summary>
public string MessageHexString { get; set; }
/// <summary>
/// 设备号
/// </summary>
public string DeviceNo { get; set; }
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
namespace JiShe.CollectBus.RabbitMQ.Senders
{
public interface IMqSender : ISingletonDependency
{
Task SendToPushAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
Task SendToPushAsync(object message, CancellationToken cancellationToken = default);
Task SendToReportAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
Task SendToReportAsync(object message, CancellationToken cancellationToken = default);
Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default);
}
}

View File

@ -0,0 +1,49 @@
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using MassTransit;
using Microsoft.Extensions.Configuration;
namespace JiShe.CollectBus.RabbitMQ.Senders
{
public class MqSender : IMqSender
{
private readonly ISendEndpointProvider _sendEndpointProvider;
private readonly IConfiguration _configuration;
public MqSender(ISendEndpointProvider sendEndpointProvider, IConfiguration configuration)
{
_sendEndpointProvider = sendEndpointProvider;
_configuration = configuration;
}
public async Task SendToPushAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
{
var queueName = _configuration["MQ:Queue:Push"];
await SendAsync(queueName, message, cancellationToken);
}
public async Task SendToPushAsync(object message, CancellationToken cancellationToken = default)
{
var queueName = _configuration["MQ:Queue:Push"];
await SendAsync(queueName, message, cancellationToken);
}
public async Task SendToReportAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
{
var queueName = _configuration["MQ:Queue:Report"];
await SendAsync(queueName, message, cancellationToken);
}
public async Task SendToReportAsync(object message, CancellationToken cancellationToken = default)
{
var queueName = _configuration["MQ:Queue:Report"];
await SendAsync(queueName, message, cancellationToken);
}
public async Task SendAsync(string queueKey,object message, CancellationToken cancellationToken = default)
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}"));
await endpoint.Send(message, cancellationToken);
}
}
}

View File

@ -21,7 +21,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.T
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.EntityFrameworkCore", "JiShe.CollectBus.EntityFrameworkCore\JiShe.CollectBus.EntityFrameworkCore.csproj", "{16D42BCF-EDB8-4153-B37D-0B10FB6DF36C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.ClickHouse", "JiShe.CollectBus.ClickHouse\JiShe.CollectBus.ClickHouse.csproj", "{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.ClickHouse", "JiShe.CollectBus.ClickHouse\JiShe.CollectBus.ClickHouse.csproj", "{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.RabbitMQ", "JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj", "{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -61,6 +63,10 @@ Global
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Release|Any CPU.Build.0 = Release|Any CPU
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -74,6 +80,7 @@ Global
{289196B4-FFBE-4E40-A3A1-FCFADBE945ED} = {3A04FB29-EA75-4499-BBF3-AF24C7D46A1D}
{16D42BCF-EDB8-4153-B37D-0B10FB6DF36C} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {33261859-9CD1-4A43-B181-AB75C247D1CD}