修改代码
This commit is contained in:
parent
acd061b9ee
commit
c0716d9667
@ -160,4 +160,11 @@
|
||||
D7,
|
||||
D8
|
||||
}
|
||||
|
||||
public enum IssuedEventType
|
||||
{
|
||||
Heartbeat,
|
||||
Login,
|
||||
Data
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>netstandard2.1</TargetFramework>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<LangVersion>preview</LangVersion>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@ -43,27 +43,30 @@ namespace Microsoft.Extensions.DependencyInjection
|
||||
if (type is not { IsClass: true, IsAbstract: false }) continue;
|
||||
if (typeof(ISingletonDependency).IsAssignableFrom(type))
|
||||
{
|
||||
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency"));
|
||||
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency") && !p.FullName.Contains("IDisposable"));
|
||||
foreach (var interfaceType in interfaceTypes)
|
||||
{
|
||||
Log.Logger.Information($"正在IOC注入ISingletonDependency {type.Name}...");
|
||||
services.AddSingleton(interfaceType, type);
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof(ITransientDependency).IsAssignableFrom(type))
|
||||
{
|
||||
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency"));
|
||||
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency") && !p.FullName.Contains("IDisposable"));
|
||||
foreach (var interfaceType in interfaceTypes)
|
||||
{
|
||||
Log.Logger.Information($"正在IOC注入ITransientDependency {type.Name}...");
|
||||
services.AddTransient(interfaceType, type);
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof(IScopedDependency).IsAssignableFrom(type))
|
||||
{
|
||||
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency"));
|
||||
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency") && !p.FullName.Contains("IDisposable"));
|
||||
foreach (var interfaceType in interfaceTypes)
|
||||
{
|
||||
Log.Logger.Information($"正在IOC注入IScopedDependency {type.Name}...");
|
||||
services.AddScoped(interfaceType, type);
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,7 +16,7 @@ namespace Microsoft.Extensions.DependencyInjection
|
||||
{
|
||||
services.AddTcpService(config =>
|
||||
{
|
||||
config.SetListenIPHosts(int.Parse(configuration["TCP:Port"] ?? "10500"))
|
||||
config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "10500"))
|
||||
.ConfigureContainer(a => //容器的配置顺序应该在最前面
|
||||
{
|
||||
a.AddConsoleLogger();
|
||||
@ -39,7 +39,7 @@ namespace Microsoft.Extensions.DependencyInjection
|
||||
{
|
||||
services.AddUdpSession(config =>
|
||||
{
|
||||
config.SetBindIPHost(int.Parse(configuration["UDP:Port"] ?? "10500"))
|
||||
config.SetBindIPHost(int.Parse(configuration["UDP:ClientPort"] ?? "10500"))
|
||||
.ConfigureContainer(a => //容器的配置顺序应该在最前面
|
||||
{
|
||||
//a.AddConsoleLogger();
|
||||
|
||||
@ -52,10 +52,9 @@ namespace JiShe.CollectBus.Console
|
||||
lc.ReadFrom.Configuration(configuration)
|
||||
.ReadFrom.Services(context);
|
||||
});
|
||||
services.ModuleRegister(hostContext);
|
||||
services.ServiceRegister();
|
||||
services.PluginServiceRegister();
|
||||
|
||||
services.ModuleRegister(hostContext);
|
||||
services.AddTcp(configuration);
|
||||
//services.AddUdp(configuration);
|
||||
services.AddStackExchangeRedisCache(options =>
|
||||
|
||||
@ -30,7 +30,10 @@
|
||||
"Default": "Data Source=192.168.111.248;Port=3306;Database=JiSheCollectBus;uid=root;pwd=123456abcD;charset=utf8mb4;Allow User Variables=true;AllowLoadLocalInfile=true",
|
||||
"ClickHouse": "host=localhost;port=8123;user=default;password=;database=default"
|
||||
},
|
||||
|
||||
"MongoSettings": {
|
||||
"Connection": "mongodb://backups_admin:jishe_mongodb_backups@118.190.144.92:27037",
|
||||
"DatabaseName": "JiSheCollectBus"
|
||||
},
|
||||
"TCP": {
|
||||
"Port": 10500
|
||||
},
|
||||
|
||||
@ -5,6 +5,7 @@ using TouchSocket.Core;
|
||||
using TouchSocket.Sockets;
|
||||
using JiShe.CollectBus.RabbitMQ.Senders;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using JiShe.CollectBus.MongoDB;
|
||||
|
||||
namespace JiShe.CollectBus.Core.Plugins
|
||||
{
|
||||
@ -12,37 +13,27 @@ namespace JiShe.CollectBus.Core.Plugins
|
||||
{
|
||||
private readonly INSender _nSender;
|
||||
private readonly ILogger<TcpServiceReceivedPlugin> _logger;
|
||||
public readonly IMongoRepository<MessageReceivedHeartbeatEvent> _mongoHeartbeatRepository;
|
||||
public readonly IMongoRepository<MessageReceivedLoginEvent> _mongoLoginRepository;
|
||||
|
||||
|
||||
public TcpServiceReceivedPlugin(INSender nSender, ILogger<TcpServiceReceivedPlugin> logger)
|
||||
public TcpServiceReceivedPlugin(INSender nSender, ILogger<TcpServiceReceivedPlugin> logger,IMongoRepository<MessageReceivedHeartbeatEvent> mongoHeartbeatRepository, IMongoRepository<MessageReceivedLoginEvent> mongoLoginRepository)
|
||||
{
|
||||
_nSender = nSender;
|
||||
_logger = logger;
|
||||
_mongoHeartbeatRepository = mongoHeartbeatRepository;
|
||||
_mongoLoginRepository = mongoLoginRepository;
|
||||
}
|
||||
|
||||
|
||||
[GeneratorPlugin(typeof(ITcpReceivedPlugin))]
|
||||
public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
|
||||
{
|
||||
////TODO: 电表主站到集中器的协议都是376.1协议,集中器下发到电表协议分为645-07和modbus
|
||||
////TODO: 水表主站到集中器的协议分为118和645-97协议
|
||||
////TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送
|
||||
|
||||
//const string protocolType = "StandardProtocol";
|
||||
|
||||
//var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>(protocolType);
|
||||
//var protocolPluginInfo = await protocolPlugin.GetAsync();
|
||||
//client.Logger.Info($"{protocolPluginInfo.Name},{protocolPluginInfo.RegularExpression}");
|
||||
////从客户端收到信息
|
||||
//var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
|
||||
//client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}");
|
||||
|
||||
//await protocolPlugin.ReceivedAsync(client,e);
|
||||
|
||||
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
|
||||
var hexStringList = messageHexString.StringToPairs();
|
||||
var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN);
|
||||
var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN);
|
||||
var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
|
||||
|
||||
if (aFn.HasValue && fn.HasValue)
|
||||
{
|
||||
@ -51,19 +42,61 @@ namespace JiShe.CollectBus.Core.Plugins
|
||||
switch (fn)
|
||||
{
|
||||
case 1://登录
|
||||
await _nSender.SendToReceivedLoginAsync(new MessageReceivedLoginEvent(client.Id,client.IP, client.Port, messageHexString,""));
|
||||
|
||||
var messageReceivedLoginEvent = new MessageReceivedLoginEvent
|
||||
{
|
||||
ClientId = client.Id,
|
||||
ClientIp = client.IP,
|
||||
ClientPort = client.Port,
|
||||
MessageHexString = messageHexString,
|
||||
ReceivedTime = DateTime.Now,
|
||||
DeviceNo = aTuple.Item1,
|
||||
MessageId = Guid.NewGuid().ToString()
|
||||
};
|
||||
await _mongoLoginRepository.AddAsync(messageReceivedLoginEvent);
|
||||
await _nSender.SendToReceivedLoginAsync(messageReceivedLoginEvent);
|
||||
break;
|
||||
case 2://退出登录
|
||||
await _nSender.SendToReceivedLoginAsync(new MessageReceivedLoginEvent(client.Id, client.IP, client.Port, messageHexString, ""));
|
||||
var messageReceivedExitLoginEvent = new MessageReceivedLoginEvent
|
||||
{
|
||||
ClientId = client.Id,
|
||||
ClientIp = client.IP,
|
||||
ClientPort = client.Port,
|
||||
MessageHexString = messageHexString,
|
||||
ReceivedTime = DateTime.Now,
|
||||
DeviceNo = aTuple.Item1,
|
||||
MessageId = Guid.NewGuid().ToString()
|
||||
};
|
||||
await _mongoLoginRepository.AddAsync(messageReceivedExitLoginEvent);
|
||||
await _nSender.SendToReceivedLoginAsync(messageReceivedExitLoginEvent);
|
||||
break;
|
||||
case 3://心跳
|
||||
await _nSender.SendToReceivedHeartbeatAsync(new MessageReceivedHeartbeatEvent(client.Id, client.IP, client.Port, messageHexString, ""));
|
||||
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeatEvent
|
||||
{
|
||||
ClientId = client.Id,
|
||||
ClientIp = client.IP,
|
||||
ClientPort = client.Port,
|
||||
MessageHexString = messageHexString,
|
||||
ReceivedTime = DateTime.Now,
|
||||
DeviceNo = aTuple.Item1,
|
||||
MessageId = Guid.NewGuid().ToString()
|
||||
};
|
||||
await _mongoHeartbeatRepository.AddAsync(messageReceivedHeartbeatEvent);
|
||||
await _nSender.SendToReceivedHeartbeatAsync(messageReceivedHeartbeatEvent);
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
await _nSender.SendToReceivedAsync(new MessageReceivedEvent(client.Id, client.IP, client.Port, messageHexString, ""));
|
||||
await _nSender.SendToReceivedAsync(new MessageReceivedEvent
|
||||
{
|
||||
ClientId = client.Id,
|
||||
ClientIp = client.IP,
|
||||
ClientPort = client.Port,
|
||||
MessageHexString = messageHexString,
|
||||
DeviceNo = aTuple.Item1,
|
||||
MessageId = Guid.NewGuid().ToString()
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
37
JiShe.CollectBus.MongoDB/IMongoContext.cs
Normal file
37
JiShe.CollectBus.MongoDB/IMongoContext.cs
Normal file
@ -0,0 +1,37 @@
|
||||
using MongoDB.Driver;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
public interface IMongoContext : IDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// 添加命令操作
|
||||
/// </summary>
|
||||
/// <param name="func">委托 方法接受一个 Func<IClientSessionHandle,Task> 委托作为参数,该委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法</param>
|
||||
/// <returns></returns>
|
||||
Task AddCommandAsync(Func<IClientSessionHandle, Task> func);
|
||||
|
||||
/// <summary>
|
||||
/// 提交更改并返回受影响的行数
|
||||
/// TODO:MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务
|
||||
/// 原因:MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <returns></returns>
|
||||
Task<int> SaveChangesAsync(IClientSessionHandle session);
|
||||
|
||||
/// <summary>
|
||||
/// 初始化Mongodb会话对象session
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task<IClientSessionHandle> StartSessionAsync();
|
||||
|
||||
/// <summary>
|
||||
/// 获取集合数据
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="name"></param>
|
||||
/// <returns></returns>
|
||||
IMongoCollection<T> GetCollection<T>(string name);
|
||||
}
|
||||
}
|
||||
176
JiShe.CollectBus.MongoDB/IMongoRepository.cs
Normal file
176
JiShe.CollectBus.MongoDB/IMongoRepository.cs
Normal file
@ -0,0 +1,176 @@
|
||||
using MongoDB.Driver;
|
||||
using System.Linq.Expressions;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
public interface IMongoRepository<T> where T : class, new()
|
||||
{
|
||||
#region 事务操作示例
|
||||
|
||||
/// <summary>
|
||||
/// 事务添加数据
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <param name="objData">添加数据</param>
|
||||
/// <returns></returns>
|
||||
Task AddTransactionsAsync(IClientSessionHandle session, T objData);
|
||||
|
||||
/// <summary>
|
||||
/// 事务数据删除
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <param name="id">objectId</param>
|
||||
/// <returns></returns>
|
||||
Task DeleteTransactionsAsync(IClientSessionHandle session, string id);
|
||||
|
||||
/// <summary>
|
||||
/// 事务异步局部更新(仅更新一条记录)
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <param name="filter">过滤器</param>
|
||||
/// <param name="update">更新条件</param>
|
||||
/// <returns></returns>
|
||||
Task UpdateTransactionsAsync(IClientSessionHandle session, FilterDefinition<T> filter, UpdateDefinition<T> update);
|
||||
|
||||
#endregion
|
||||
|
||||
#region 添加相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 添加数据
|
||||
/// </summary>
|
||||
/// <param name="objData">添加数据</param>
|
||||
/// <returns></returns>
|
||||
Task AddAsync(T objData);
|
||||
|
||||
/// <summary>
|
||||
/// 批量插入
|
||||
/// </summary>
|
||||
/// <param name="objDatas">实体集合</param>
|
||||
/// <returns></returns>
|
||||
Task InsertManyAsync(List<T> objDatas);
|
||||
|
||||
#endregion
|
||||
|
||||
#region 删除相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 数据删除
|
||||
/// </summary>
|
||||
/// <param name="id">objectId</param>
|
||||
/// <returns></returns>
|
||||
Task DeleteAsync(string id);
|
||||
|
||||
/// <summary>
|
||||
/// 异步删除多条数据
|
||||
/// </summary>
|
||||
/// <param name="filter">删除的条件</param>
|
||||
/// <returns></returns>
|
||||
Task<DeleteResult> DeleteManyAsync(FilterDefinition<T> filter);
|
||||
|
||||
#endregion
|
||||
|
||||
#region 修改相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 指定对象异步修改一条数据
|
||||
/// </summary>
|
||||
/// <param name="obj">要修改的对象</param>
|
||||
/// <param name="id">修改条件</param>
|
||||
/// <returns></returns>
|
||||
Task UpdateAsync(T obj, string id);
|
||||
|
||||
/// <summary>
|
||||
/// 局部更新(仅更新一条记录)
|
||||
/// <para><![CDATA[expression 参数示例:x => x.Id == 1 && x.Age > 18 && x.Gender == 0]]></para>
|
||||
/// <para><![CDATA[entity 参数示例:y => new T{ RealName = "Ray", Gender = 1}]]></para>
|
||||
/// </summary>
|
||||
/// <param name="expression">筛选条件</param>
|
||||
/// <param name="entity">更新条件</param>
|
||||
/// <returns></returns>
|
||||
Task UpdateAsync(Expression<Func<T, bool>> expression, Expression<Action<T>> entity);
|
||||
|
||||
/// <summary>
|
||||
/// 异步局部更新(仅更新一条记录)
|
||||
/// </summary>
|
||||
/// <param name="filter">过滤器</param>
|
||||
/// <param name="update">更新条件</param>
|
||||
/// <returns></returns>
|
||||
Task UpdateAsync(FilterDefinition<T> filter, UpdateDefinition<T> update);
|
||||
|
||||
/// <summary>
|
||||
/// 异步局部更新(仅更新多条记录)
|
||||
/// </summary>
|
||||
/// <param name="expression">筛选条件</param>
|
||||
/// <param name="update">更新条件</param>
|
||||
/// <returns></returns>
|
||||
Task UpdateManyAsync(Expression<Func<T, bool>> expression, UpdateDefinition<T> update);
|
||||
|
||||
/// <summary>
|
||||
/// 异步批量修改数据
|
||||
/// </summary>
|
||||
/// <param name="dic">要修改的字段</param>
|
||||
/// <param name="filter">更新条件</param>
|
||||
/// <returns></returns>
|
||||
Task<UpdateResult> UpdateManayAsync(Dictionary<string, string> dic, FilterDefinition<T> filter);
|
||||
|
||||
#endregion
|
||||
|
||||
#region 查询统计相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 通过ID主键获取数据
|
||||
/// </summary>
|
||||
/// <param name="id">objectId</param>
|
||||
/// <returns></returns>
|
||||
Task<T> GetByIdAsync(string id);
|
||||
/// <summary>
|
||||
/// 获取所有数据
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task<IEnumerable<T>> GetAllAsync();
|
||||
|
||||
/// <summary>
|
||||
/// 获取记录数
|
||||
/// </summary>
|
||||
/// <param name="expression">筛选条件</param>
|
||||
/// <returns></returns>
|
||||
Task<long> CountAsync(Expression<Func<T, bool>> expression);
|
||||
|
||||
/// <summary>
|
||||
/// 获取记录数
|
||||
/// </summary>
|
||||
/// <param name="filter">过滤器</param>
|
||||
/// <returns></returns>
|
||||
Task<long> CountAsync(FilterDefinition<T> filter);
|
||||
|
||||
/// <summary>
|
||||
/// 判断是否存在
|
||||
/// </summary>
|
||||
/// <param name="predicate">条件</param>
|
||||
/// <returns></returns>
|
||||
Task<bool> ExistsAsync(Expression<Func<T, bool>> predicate);
|
||||
|
||||
/// <summary>
|
||||
/// 异步查询集合
|
||||
/// </summary>
|
||||
/// <param name="filter">查询条件</param>
|
||||
/// <param name="field">要查询的字段,不写时查询全部</param>
|
||||
/// <param name="sort">要排序的字段</param>
|
||||
/// <returns></returns>
|
||||
Task<List<T>> FindListAsync(FilterDefinition<T> filter, string[]? field = null, SortDefinition<T>? sort = null);
|
||||
|
||||
/// <summary>
|
||||
/// 异步分页查询集合
|
||||
/// </summary>
|
||||
/// <param name="filter">查询条件</param>
|
||||
/// <param name="pageIndex">当前页</param>
|
||||
/// <param name="pageSize">页容量</param>
|
||||
/// <param name="field">要查询的字段,不写时查询全部</param>
|
||||
/// <param name="sort">要排序的字段</param>
|
||||
/// <returns></returns>
|
||||
Task<List<T>> FindListByPageAsync(FilterDefinition<T> filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition<T>? sort = null);
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
25
JiShe.CollectBus.MongoDB/IUnitOfWork.cs
Normal file
25
JiShe.CollectBus.MongoDB/IUnitOfWork.cs
Normal file
@ -0,0 +1,25 @@
|
||||
using MongoDB.Driver;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
public interface IUnitOfWork : IDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// 提交保存更改
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <returns></returns>
|
||||
Task<bool> Commit(IClientSessionHandle session);
|
||||
|
||||
/// <summary>
|
||||
/// 初始化MongoDB会话对象session
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task<IClientSessionHandle> InitTransaction();
|
||||
}
|
||||
}
|
||||
18
JiShe.CollectBus.MongoDB/JiShe.CollectBus.MongoDB.csproj
Normal file
18
JiShe.CollectBus.MongoDB/JiShe.CollectBus.MongoDB.csproj
Normal file
@ -0,0 +1,18 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
|
||||
<PackageReference Include="MongoDB.Driver" Version="3.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
17
JiShe.CollectBus.MongoDB/JiSheCollectBusMongoDBModule.cs
Normal file
17
JiShe.CollectBus.MongoDB/JiSheCollectBusMongoDBModule.cs
Normal file
@ -0,0 +1,17 @@
|
||||
using JiShe.CollectBus.Common.Interfaces;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using MongoDB.Driver;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
public class JiSheCollectBusMongoDbModule: IJiSheModule
|
||||
{
|
||||
public void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
|
||||
{
|
||||
services.AddSingleton<IMongoContext, MongoContext>();
|
||||
services.AddSingleton<IUnitOfWork, UnitOfWork>();
|
||||
services.AddSingleton(typeof(IMongoRepository<>), typeof(MongoBaseRepository<>));
|
||||
}
|
||||
}
|
||||
}
|
||||
344
JiShe.CollectBus.MongoDB/MongoBaseRepository.cs
Normal file
344
JiShe.CollectBus.MongoDB/MongoBaseRepository.cs
Normal file
@ -0,0 +1,344 @@
|
||||
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Driver;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.ComponentModel.DataAnnotations.Schema;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Reflection;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
public class MongoBaseRepository<T> : IMongoRepository<T> where T : class, new()
|
||||
{
|
||||
private readonly IMongoContext _context;
|
||||
private readonly IMongoCollection<T> _dbSet;
|
||||
|
||||
public MongoBaseRepository(IMongoContext context)
|
||||
{
|
||||
_context = context;
|
||||
var collectionName = typeof(T).GetCustomAttribute<TableAttribute>()?.Name ?? typeof(T).Name;
|
||||
_dbSet = _context.GetCollection<T>(collectionName);
|
||||
}
|
||||
|
||||
#region 事务操作示例
|
||||
|
||||
/// <summary>
|
||||
/// 事务添加数据
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <param name="objData">添加数据</param>
|
||||
/// <returns></returns>
|
||||
public async Task AddTransactionsAsync(IClientSessionHandle session, T objData)
|
||||
{
|
||||
await _context.AddCommandAsync(async (session) => await _dbSet.InsertOneAsync(objData));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 事务数据删除
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <param name="id">objectId</param>
|
||||
/// <returns></returns>
|
||||
public async Task DeleteTransactionsAsync(IClientSessionHandle session, string id)
|
||||
{
|
||||
await _context.AddCommandAsync((session) => _dbSet.DeleteOneAsync(Builders<T>.Filter.Eq(" _id ", id)));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 事务异步局部更新(仅更新一条记录)
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <param name="filter">过滤器</param>
|
||||
/// <param name="update">更新条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task UpdateTransactionsAsync(IClientSessionHandle session, FilterDefinition<T> filter, UpdateDefinition<T> update)
|
||||
{
|
||||
await _context.AddCommandAsync((session) => _dbSet.UpdateOneAsync(filter, update));
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region 添加相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 添加数据
|
||||
/// </summary>
|
||||
/// <param name="objData">添加数据</param>
|
||||
/// <returns></returns>
|
||||
public async Task AddAsync(T objData)
|
||||
{
|
||||
await _dbSet.InsertOneAsync(objData);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 批量插入
|
||||
/// </summary>
|
||||
/// <param name="objDatas">实体集合</param>
|
||||
/// <returns></returns>
|
||||
public async Task InsertManyAsync(List<T> objDatas)
|
||||
{
|
||||
await _dbSet.InsertManyAsync(objDatas);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region 删除相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 数据删除
|
||||
/// </summary>
|
||||
/// <param name="id">objectId</param>
|
||||
/// <returns></returns>
|
||||
public async Task DeleteAsync(string id)
|
||||
{
|
||||
await _dbSet.DeleteOneAsync(Builders<T>.Filter.Eq("_id", new ObjectId(id)));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步删除多条数据
|
||||
/// </summary>
|
||||
/// <param name="filter">删除的条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task<DeleteResult> DeleteManyAsync(FilterDefinition<T> filter)
|
||||
{
|
||||
return await _dbSet.DeleteManyAsync(filter);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region 修改相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 指定对象异步修改一条数据
|
||||
/// </summary>
|
||||
/// <param name="obj">要修改的对象</param>
|
||||
/// <param name="id">修改条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task UpdateAsync(T obj, string id)
|
||||
{
|
||||
//修改条件
|
||||
FilterDefinition<T> filter = Builders<T>.Filter.Eq("_id", new ObjectId(id));
|
||||
//要修改的字段
|
||||
var list = new List<UpdateDefinition<T>>();
|
||||
foreach (var item in obj.GetType().GetProperties())
|
||||
{
|
||||
if (item.Name.ToLower() == "id") continue;
|
||||
list.Add(Builders<T>.Update.Set(item.Name, item.GetValue(obj)));
|
||||
}
|
||||
var updatefilter = Builders<T>.Update.Combine(list);
|
||||
await _dbSet.UpdateOneAsync(filter, updatefilter);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 局部更新(仅更新一条记录)
|
||||
/// <para><![CDATA[expression 参数示例:x => x.Id == 1 && x.Age > 18 && x.Gender == 0]]></para>
|
||||
/// <para><![CDATA[entity 参数示例:y => new T{ RealName = "Ray", Gender = 1}]]></para>
|
||||
/// </summary>
|
||||
/// <param name="expression">筛选条件</param>
|
||||
/// <param name="entity">更新条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task UpdateAsync(Expression<Func<T, bool>> expression, Expression<Action<T>> entity)
|
||||
{
|
||||
var fieldList = new List<UpdateDefinition<T>>();
|
||||
|
||||
if (entity.Body is MemberInitExpression param)
|
||||
{
|
||||
foreach (var item in param.Bindings)
|
||||
{
|
||||
var propertyName = item.Member.Name;
|
||||
object propertyValue = null;
|
||||
|
||||
if (item is not MemberAssignment memberAssignment) continue;
|
||||
|
||||
if (memberAssignment.Expression.NodeType == ExpressionType.Constant)
|
||||
{
|
||||
if (memberAssignment.Expression is ConstantExpression constantExpression)
|
||||
propertyValue = constantExpression.Value;
|
||||
}
|
||||
else
|
||||
{
|
||||
propertyValue = Expression.Lambda(memberAssignment.Expression, null).Compile().DynamicInvoke();
|
||||
}
|
||||
|
||||
if (propertyName != "_id") //实体键_id不允许更新
|
||||
{
|
||||
fieldList.Add(Builders<T>.Update.Set(propertyName, propertyValue));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await _dbSet.UpdateOneAsync(expression, Builders<T>.Update.Combine(fieldList));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步局部更新(仅更新一条记录)
|
||||
/// </summary>
|
||||
/// <param name="filter">过滤器</param>
|
||||
/// <param name="update">更新条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task UpdateAsync(FilterDefinition<T> filter, UpdateDefinition<T> update)
|
||||
{
|
||||
await _dbSet.UpdateOneAsync(filter, update);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步局部更新(仅更新多条记录)
|
||||
/// </summary>
|
||||
/// <param name="expression">筛选条件</param>
|
||||
/// <param name="update">更新条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task UpdateManyAsync(Expression<Func<T, bool>> expression, UpdateDefinition<T> update)
|
||||
{
|
||||
await _dbSet.UpdateManyAsync(expression, update);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步批量修改数据
|
||||
/// </summary>
|
||||
/// <param name="dic">要修改的字段</param>
|
||||
/// <param name="filter">更新条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task<UpdateResult> UpdateManayAsync(Dictionary<string, string> dic, FilterDefinition<T> filter)
|
||||
{
|
||||
T t = new T();
|
||||
//要修改的字段
|
||||
var list = new List<UpdateDefinition<T>>();
|
||||
foreach (var item in t.GetType().GetProperties())
|
||||
{
|
||||
if (!dic.ContainsKey(item.Name)) continue;
|
||||
var value = dic[item.Name];
|
||||
list.Add(Builders<T>.Update.Set(item.Name, value));
|
||||
}
|
||||
var updatefilter = Builders<T>.Update.Combine(list);
|
||||
return await _dbSet.UpdateManyAsync(filter, updatefilter);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region 查询统计相关操作
|
||||
|
||||
/// <summary>
|
||||
/// 通过ID主键获取数据
|
||||
/// </summary>
|
||||
/// <param name="id">objectId</param>
|
||||
/// <returns></returns>
|
||||
public async Task<T> GetByIdAsync(string id)
|
||||
{
|
||||
var queryData = await _dbSet.FindAsync(Builders<T>.Filter.Eq("_id", new ObjectId(id)));
|
||||
return queryData.FirstOrDefault();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取所有数据
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task<IEnumerable<T>> GetAllAsync()
|
||||
{
|
||||
var queryAllData = await _dbSet.FindAsync(Builders<T>.Filter.Empty);
|
||||
return queryAllData.ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取记录数
|
||||
/// </summary>
|
||||
/// <param name="expression">筛选条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task<long> CountAsync(Expression<Func<T, bool>> expression)
|
||||
{
|
||||
return await _dbSet.CountDocumentsAsync(expression);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取记录数
|
||||
/// </summary>
|
||||
/// <param name="filter">过滤器</param>
|
||||
/// <returns></returns>
|
||||
public async Task<long> CountAsync(FilterDefinition<T> filter)
|
||||
{
|
||||
return await _dbSet.CountDocumentsAsync(filter);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 判断是否存在
|
||||
/// </summary>
|
||||
/// <param name="predicate">条件</param>
|
||||
/// <returns></returns>
|
||||
public async Task<bool> ExistsAsync(Expression<Func<T, bool>> predicate)
|
||||
{
|
||||
return await Task.FromResult(_dbSet.AsQueryable().Any(predicate));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步查询集合
|
||||
/// </summary>
|
||||
/// <param name="filter">查询条件</param>
|
||||
/// <param name="field">要查询的字段,不写时查询全部</param>
|
||||
/// <param name="sort">要排序的字段</param>
|
||||
/// <returns></returns>
|
||||
public async Task<List<T>> FindListAsync(FilterDefinition<T> filter, string[]? field = null, SortDefinition<T>? sort = null)
|
||||
{
|
||||
//不指定查询字段
|
||||
if (field == null || field.Length == 0)
|
||||
{
|
||||
if (sort == null) return await _dbSet.Find(filter).ToListAsync();
|
||||
return await _dbSet.Find(filter).Sort(sort).ToListAsync();
|
||||
}
|
||||
|
||||
//指定查询字段
|
||||
var fieldList = new List<ProjectionDefinition<T>>();
|
||||
for (int i = 0; i < field.Length; i++)
|
||||
{
|
||||
fieldList.Add(Builders<T>.Projection.Include(field[i].ToString()));
|
||||
}
|
||||
var projection = Builders<T>.Projection.Combine(fieldList);
|
||||
fieldList?.Clear();
|
||||
|
||||
//不排序
|
||||
if (sort == null) return await _dbSet.Find(filter).Project<T>(projection).ToListAsync();
|
||||
|
||||
//排序查询
|
||||
return await _dbSet.Find(filter).Sort(sort).Project<T>(projection).ToListAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步分页查询集合
|
||||
/// </summary>
|
||||
/// <param name="filter">查询条件</param>
|
||||
/// <param name="pageIndex">当前页</param>
|
||||
/// <param name="pageSize">页容量</param>
|
||||
/// <param name="field">要查询的字段,不写时查询全部</param>
|
||||
/// <param name="sort">要排序的字段</param>
|
||||
/// <returns></returns>
|
||||
public async Task<List<T>> FindListByPageAsync(FilterDefinition<T> filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition<T>? sort = null)
|
||||
{
|
||||
//不指定查询字段
|
||||
if (field == null || field.Length == 0)
|
||||
{
|
||||
if (sort == null) return await _dbSet.Find(filter).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
|
||||
//进行排序
|
||||
return await _dbSet.Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
|
||||
}
|
||||
|
||||
//指定查询字段
|
||||
var fieldList = new List<ProjectionDefinition<T>>();
|
||||
for (int i = 0; i < field.Length; i++)
|
||||
{
|
||||
fieldList.Add(Builders<T>.Projection.Include(field[i].ToString()));
|
||||
}
|
||||
var projection = Builders<T>.Projection.Combine(fieldList);
|
||||
fieldList?.Clear();
|
||||
|
||||
//不排序
|
||||
if (sort == null) return await _dbSet.Find(filter).Project<T>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
|
||||
|
||||
//排序查询
|
||||
return await _dbSet.Find(filter).Sort(sort).Project<T>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
99
JiShe.CollectBus.MongoDB/MongoContext.cs
Normal file
99
JiShe.CollectBus.MongoDB/MongoContext.cs
Normal file
@ -0,0 +1,99 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using MongoDB.Driver;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
public class MongoContext : IMongoContext
|
||||
{
|
||||
private readonly IMongoDatabase _database;
|
||||
private readonly MongoClient _mongoClient;
|
||||
|
||||
//这里将 _commands 中的每个元素都定义为一个 Func<IClientSessionHandle, Task> 委托,此委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法
|
||||
//每个委托都表示一个MongoDB 会话(session)对象和要执行的命令
|
||||
private readonly List<Func<IClientSessionHandle, Task>> _commands = [];
|
||||
|
||||
public MongoContext(IConfiguration configuration)
|
||||
{
|
||||
_mongoClient = new MongoClient(configuration["MongoSettings:Connection"]);
|
||||
_database = _mongoClient.GetDatabase(configuration["MongoSettings:DatabaseName"]);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 添加命令操作
|
||||
/// </summary>
|
||||
/// <param name="func">方法接受一个 Func<IClientSessionHandle, Task> 委托作为参数,该委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法</param>
|
||||
/// <returns></returns>
|
||||
public async Task AddCommandAsync(Func<IClientSessionHandle, Task> func)
|
||||
{
|
||||
_commands.Add(func);
|
||||
await Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 提交更改并返回受影响的行数
|
||||
/// TODO:MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务
|
||||
/// 原因:MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <returns></returns>
|
||||
public async Task<int> SaveChangesAsync(IClientSessionHandle session)
|
||||
{
|
||||
try
|
||||
{
|
||||
session.StartTransaction();//开始事务
|
||||
|
||||
foreach (var command in _commands)
|
||||
{
|
||||
await command(session);
|
||||
}
|
||||
|
||||
await session.CommitTransactionAsync();//提交事务
|
||||
return _commands.Count;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await session.AbortTransactionAsync();//回滚事务
|
||||
return 0;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_commands.Clear();//清空_commands列表中的元素
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 初始化Mongodb会话对象session
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task<IClientSessionHandle> StartSessionAsync()
|
||||
{
|
||||
var session = await _mongoClient.StartSessionAsync();
|
||||
return session;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取MongoDB集合
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="name">集合名称</param>
|
||||
/// <returns></returns>
|
||||
public IMongoCollection<T> GetCollection<T>(string name)
|
||||
{
|
||||
return _database.GetCollection<T>(name);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 释放上下文
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
44
JiShe.CollectBus.MongoDB/UnitOfWork.cs
Normal file
44
JiShe.CollectBus.MongoDB/UnitOfWork.cs
Normal file
@ -0,0 +1,44 @@
|
||||
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
|
||||
using MongoDB.Driver;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.MongoDB
|
||||
{
|
||||
public class UnitOfWork : IUnitOfWork
|
||||
{
|
||||
private readonly IMongoContext _context;
|
||||
|
||||
public UnitOfWork(IMongoContext context)
|
||||
{
|
||||
_context = context;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 提交保存更改
|
||||
/// </summary>
|
||||
/// <param name="session">MongoDB 会话(session)对象</param>
|
||||
/// <returns></returns>
|
||||
public async Task<bool> Commit(IClientSessionHandle session)
|
||||
{
|
||||
return await _context.SaveChangesAsync(session) > 0;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 初始化MongoDB会话对象session
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task<IClientSessionHandle> InitTransaction()
|
||||
{
|
||||
return await _context.StartSessionAsync();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_context.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,9 +1,4 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using JiShe.CollectBus.Common;
|
||||
using JiShe.CollectBus.Common.Enums;
|
||||
using JiShe.CollectBus.Common.Enums;
|
||||
using JiShe.CollectBus.Common.Extensions;
|
||||
using JiShe.CollectBus.Common.Models;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||
@ -12,9 +7,8 @@ using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
{
|
||||
public abstract class BaseProtocolPlugin : IProtocolPlugin
|
||||
public abstract class BaseProtocolPlugin(ILogger<BaseProtocolPlugin> logger) : IProtocolPlugin
|
||||
{
|
||||
public readonly ILogger<BaseProtocolPlugin> _logger;
|
||||
//起始字符
|
||||
public const string stx = "68";
|
||||
//结束字符
|
||||
@ -27,11 +21,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
public const int tPLen = 6;
|
||||
|
||||
|
||||
protected BaseProtocolPlugin(ILogger<BaseProtocolPlugin> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public abstract Task<ProtocolInfo> GetAsync();
|
||||
|
||||
public abstract Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent, Action<byte[]>? sendAction = null);
|
||||
@ -42,7 +31,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
/// <param name="messageReceivedEvent">报文</param>
|
||||
/// <param name="sendAction">发送委托</param>
|
||||
/// <returns></returns>
|
||||
public virtual Task LoginAsync(MessageReceivedLoginEvent messageReceivedEvent, Action<byte[]>? sendAction = null)
|
||||
public virtual async Task LoginAsync(MessageReceivedLoginEvent messageReceivedEvent, Action<byte[]>? sendAction = null)
|
||||
{
|
||||
var hexStringList = messageReceivedEvent.MessageHexString.StringToPairs();
|
||||
var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
|
||||
@ -69,7 +58,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
{
|
||||
sendAction(bytes);
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -78,7 +66,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
/// <param name="messageReceivedEvent">报文</param>
|
||||
/// <param name="sendAction">发送委托</param>
|
||||
/// <returns></returns>
|
||||
public virtual Task HeartbeatAsync(MessageReceivedHeartbeatEvent messageReceivedEvent, Action<byte[]>? sendAction = null)
|
||||
public virtual async Task HeartbeatAsync(MessageReceivedHeartbeatEvent messageReceivedEvent, Action<byte[]>? sendAction = null)
|
||||
{
|
||||
var hexStringList = messageReceivedEvent.MessageHexString.StringToPairs();
|
||||
var aTuple = (Tuple<string,int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
|
||||
@ -113,7 +101,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
sendAction(bytes);
|
||||
}
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
|
||||
@ -129,7 +116,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||
cmdStrList.AddRange(userDatas);
|
||||
cmdStrList.Add(cs);
|
||||
cmdStrList.Add(end);
|
||||
_logger.LogInformation($"回复:{string.Join(" ", cmdStrList)}");
|
||||
logger.LogInformation($"回复:{string.Join(" ", cmdStrList)}");
|
||||
var bytes = cmdStrList.Select(x => Convert.ToByte(x, 16)).ToArray();
|
||||
return bytes;
|
||||
}
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>netstandard2.1</TargetFramework>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<LangVersion>preview</LangVersion>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
@ -14,6 +14,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
namespace JiShe.CollectBus.Protocol.Contracts.Models
|
||||
using JiShe.CollectBus.Common.Enums;
|
||||
|
||||
namespace JiShe.CollectBus.Protocol.Contracts.Models
|
||||
{
|
||||
public class MessageIssuedEvent
|
||||
{
|
||||
public string ClientId { get; set; }
|
||||
public byte[] Message { get; set; }
|
||||
public string DeviceNo { get; set; }
|
||||
public IssuedEventType Type { get; set; }
|
||||
public string MessageId { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,53 +1,52 @@
|
||||
namespace JiShe.CollectBus.Protocol.Contracts.Models
|
||||
using JetBrains.Annotations;
|
||||
|
||||
namespace JiShe.CollectBus.Protocol.Contracts.Models
|
||||
{
|
||||
public class MessageReceivedEvent
|
||||
{
|
||||
public MessageReceivedEvent(string clientId, string clientIp,int port, string messageHexString, string deviceNo)
|
||||
{
|
||||
ClientId = clientId;
|
||||
ClientIP = clientIp;
|
||||
Port = port;
|
||||
MessageHexString = messageHexString;
|
||||
DeviceNo = deviceNo;
|
||||
}
|
||||
|
||||
public string MessageId { get; set; } = string.Empty;
|
||||
/// <summary>
|
||||
/// 客服端标识
|
||||
/// </summary>
|
||||
public string ClientId { get; set; }
|
||||
public string ClientId { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// 客服端IP
|
||||
/// </summary>
|
||||
public string ClientIP { get; set; }
|
||||
public string ClientIp { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// 客服端端口
|
||||
/// </summary>
|
||||
public int Port { get; set; }
|
||||
public int ClientPort { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 客服端报文
|
||||
/// </summary>
|
||||
public string MessageHexString { get; set; }
|
||||
public string MessageHexString { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// 设备号
|
||||
/// </summary>
|
||||
public string DeviceNo { get; set; }
|
||||
public string DeviceNo { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// 接收指令时间
|
||||
/// </summary>
|
||||
public DateTime ReceivedTime { get; set; }
|
||||
}
|
||||
|
||||
public class MessageReceivedLoginEvent: MessageReceivedEvent
|
||||
{
|
||||
public MessageReceivedLoginEvent(string clientId, string clientIp, int port, string messageHexString, string deviceNo) : base(clientId, clientIp, port, messageHexString, deviceNo)
|
||||
{
|
||||
}
|
||||
public bool IsAck { get; set; } = false;
|
||||
|
||||
public DateTime? AckTime { get; set; }
|
||||
}
|
||||
|
||||
public class MessageReceivedHeartbeatEvent : MessageReceivedEvent
|
||||
{
|
||||
public MessageReceivedHeartbeatEvent(string clientId, string clientIp, int port, string messageHexString, string deviceNo) : base(clientId, clientIp, port, messageHexString, deviceNo)
|
||||
{
|
||||
}
|
||||
public bool IsAck { get; set; } = false;
|
||||
|
||||
public DateTime? AckTime { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
|
||||
using JiShe.CollectBus.MongoDB;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Attributes;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
using JiShe.CollectBus.Common;
|
||||
using JiShe.CollectBus.Common.Enums;
|
||||
using JiShe.CollectBus.Common.Enums;
|
||||
using JiShe.CollectBus.Common.Extensions;
|
||||
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
|
||||
using JiShe.CollectBus.Common.Models;
|
||||
@ -7,9 +6,7 @@ using JiShe.CollectBus.Protocol.Contracts.Abstracts;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Attributes;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||
using JiShe.CollectBus.RabbitMQ.Senders;
|
||||
using Microsoft.Extensions.Caching.Distributed;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using TouchSocket.Sockets;
|
||||
|
||||
namespace JiShe.CollectBus.Protocol
|
||||
{
|
||||
@ -21,7 +18,6 @@ namespace JiShe.CollectBus.Protocol
|
||||
var info = new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980");
|
||||
return await Task.FromResult(info);
|
||||
}
|
||||
|
||||
public override async Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent, Action<byte[]>? sendAction = null)
|
||||
{
|
||||
var cmdResult = AnalysisCmd(messageReceivedEvent.MessageHexString);
|
||||
@ -37,28 +33,21 @@ namespace JiShe.CollectBus.Protocol
|
||||
{
|
||||
async void SendAction(byte[] bytes)
|
||||
{
|
||||
await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes });
|
||||
await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login,MessageId = messageReceivedEvent.MessageId});
|
||||
}
|
||||
|
||||
await base.LoginAsync(messageReceivedEvent, SendAction);
|
||||
}
|
||||
|
||||
|
||||
public override async Task HeartbeatAsync(MessageReceivedHeartbeatEvent messageReceivedEvent, Action<byte[]>? sendAction = null)
|
||||
{
|
||||
async void SendAction(byte[] bytes)
|
||||
{
|
||||
await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes });
|
||||
await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedEvent.MessageId });
|
||||
}
|
||||
await base.HeartbeatAsync(messageReceivedEvent, SendAction);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#region 上行命令
|
||||
|
||||
//68
|
||||
|
||||
@ -1,4 +1,7 @@
|
||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using JiShe.CollectBus.Common.Enums;
|
||||
using JiShe.CollectBus.MongoDB;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||
using MassTransit;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using TouchSocket.Sockets;
|
||||
@ -9,15 +12,33 @@ namespace JiShe.CollectBus.RabbitMQ.Consumers
|
||||
{
|
||||
private readonly ILogger<MessageIssuedEvent> _logger;
|
||||
private readonly ITcpService _tcpService;
|
||||
public readonly IMongoRepository<MessageReceivedHeartbeatEvent> _mongoHeartbeatRepository;
|
||||
public readonly IMongoRepository<MessageReceivedLoginEvent> _mongoLoginRepository;
|
||||
|
||||
public MessageIssuedConsumer(ILogger<MessageIssuedEvent> logger, ITcpService tcpService)
|
||||
|
||||
public MessageIssuedConsumer(ILogger<MessageIssuedEvent> logger, ITcpService tcpService, IMongoRepository<MessageReceivedHeartbeatEvent> mongoHeartbeatRepository, IMongoRepository<MessageReceivedLoginEvent> mongoLoginRepository)
|
||||
{
|
||||
_logger = logger;
|
||||
_tcpService = tcpService;
|
||||
_tcpService = tcpService;
|
||||
_mongoHeartbeatRepository = mongoHeartbeatRepository;
|
||||
_mongoLoginRepository = mongoLoginRepository;
|
||||
}
|
||||
|
||||
public async Task Consume(ConsumeContext<MessageIssuedEvent> context)
|
||||
{
|
||||
switch (context.Message.Type)
|
||||
{
|
||||
case IssuedEventType.Heartbeat:
|
||||
await _mongoHeartbeatRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b =>new MessageReceivedHeartbeatEvent { IsAck = true,AckTime=DateTime.Now});
|
||||
break;
|
||||
case IssuedEventType.Login:
|
||||
await _mongoLoginRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedLoginEvent { IsAck = true, AckTime = DateTime.Now });
|
||||
break;
|
||||
case IssuedEventType.Data:
|
||||
break;
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException();
|
||||
}
|
||||
await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||
using JiShe.CollectBus.MongoDB;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||
using MassTransit;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
|
||||
@ -30,24 +30,28 @@ namespace JiShe.CollectBus.RabbitMQ
|
||||
{
|
||||
configurator.ConfigureConsumeTopology = false;
|
||||
configurator.Consumer<MessageReceivedConsumer>(context);
|
||||
configurator.Durable = true;
|
||||
});
|
||||
// 登录
|
||||
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login" ?? string.Empty, configurator =>
|
||||
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login", configurator =>
|
||||
{
|
||||
configurator.ConfigureConsumeTopology = false;
|
||||
configurator.Consumer<MessageReceivedLoginConsumer>(context);
|
||||
configurator.Durable = true;
|
||||
});
|
||||
// 心跳
|
||||
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat" ?? string.Empty, configurator =>
|
||||
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat", configurator =>
|
||||
{
|
||||
configurator.ConfigureConsumeTopology = false;
|
||||
configurator.Consumer<MessageReceivedHeartbeatConsumer>(context);
|
||||
configurator.Durable = true;
|
||||
});
|
||||
// 消息下发队列
|
||||
cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator =>
|
||||
{
|
||||
configurator.ConfigureConsumeTopology = false;
|
||||
configurator.Consumer<MessageIssuedConsumer>(context);
|
||||
configurator.Durable = true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,10 +1,5 @@
|
||||
using MassTransit;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
|
||||
|
||||
namespace JiShe.CollectBus.RabbitMQ.Senders
|
||||
|
||||
@ -25,6 +25,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.ClickHouse
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.RabbitMQ", "JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj", "{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.MongoDB", "JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{223DBDB1-6CD3-4D4E-8795-42550BC0A871}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
@ -67,6 +69,10 @@ Global
|
||||
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
@ -81,6 +87,7 @@ Global
|
||||
{16D42BCF-EDB8-4153-B37D-0B10FB6DF36C} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
|
||||
{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
|
||||
{DB46D90E-304D-48B7-9ED6-F4DCC95D3824} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
|
||||
{223DBDB1-6CD3-4D4E-8795-42550BC0A871} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5}
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {33261859-9CD1-4A43-B181-AB75C247D1CD}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user