From c0716d9667f0048365c793408b565fab483b2fb8 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Wed, 30 Oct 2024 17:49:05 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- JiShe.CollectBus.Common/Enums/376Enums.cs | 7 + .../JiShe.CollectBus.Common.csproj | 4 +- .../DependencyInjectionExtensions.cs | 9 +- .../ServiceCollectionExtensions.cs | 4 +- JiShe.CollectBus.Console/Program.cs | 3 +- JiShe.CollectBus.Console/appsettings.json | 5 +- .../Plugins/TcpServiceReceivedPlugin.cs | 73 +++- JiShe.CollectBus.MongoDB/IMongoContext.cs | 37 ++ JiShe.CollectBus.MongoDB/IMongoRepository.cs | 176 +++++++++ JiShe.CollectBus.MongoDB/IUnitOfWork.cs | 25 ++ .../JiShe.CollectBus.MongoDB.csproj | 18 + .../JiSheCollectBusMongoDBModule.cs | 17 + .../MongoBaseRepository.cs | 344 ++++++++++++++++++ JiShe.CollectBus.MongoDB/MongoContext.cs | 99 +++++ JiShe.CollectBus.MongoDB/UnitOfWork.cs | 44 +++ .../Abstracts/BaseProtocolPlugin.cs | 23 +- ...JiShe.CollectBus.Protocol.Contracts.csproj | 5 +- .../Models/MessageIssuedEvent.cs | 6 +- .../Models/MessageReceivedEvent.cs | 41 +-- .../TestProtocolPlugin.cs | 1 + .../StandardProtocolPlugin.cs | 17 +- .../Consumers/MessageIssuedConsumer.cs | 27 +- .../MessageReceivedHeartbeatConsumer.cs | 3 +- .../JiShe.CollectBus.RabbitMQ.csproj | 1 + .../JiSheCollectBusRabbitMqModule.cs | 8 +- JiShe.CollectBus.RabbitMQ/Senders/NSender.cs | 5 - JiShe.CollectBus.sln | 7 + 27 files changed, 912 insertions(+), 97 deletions(-) create mode 100644 JiShe.CollectBus.MongoDB/IMongoContext.cs create mode 100644 JiShe.CollectBus.MongoDB/IMongoRepository.cs create mode 100644 JiShe.CollectBus.MongoDB/IUnitOfWork.cs create mode 100644 JiShe.CollectBus.MongoDB/JiShe.CollectBus.MongoDB.csproj create mode 100644 JiShe.CollectBus.MongoDB/JiSheCollectBusMongoDBModule.cs create mode 100644 JiShe.CollectBus.MongoDB/MongoBaseRepository.cs create mode 100644 JiShe.CollectBus.MongoDB/MongoContext.cs create mode 100644 JiShe.CollectBus.MongoDB/UnitOfWork.cs diff --git a/JiShe.CollectBus.Common/Enums/376Enums.cs b/JiShe.CollectBus.Common/Enums/376Enums.cs index 0f10f0d..dd8e4bd 100644 --- a/JiShe.CollectBus.Common/Enums/376Enums.cs +++ b/JiShe.CollectBus.Common/Enums/376Enums.cs @@ -160,4 +160,11 @@ D7, D8 } + + public enum IssuedEventType + { + Heartbeat, + Login, + Data + } } diff --git a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj index b6ff8a7..09985a2 100644 --- a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj +++ b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj @@ -1,9 +1,9 @@  - netstandard2.1 + net8.0 + enable enable - preview diff --git a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs index a2f7992..84e5fa5 100644 --- a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs +++ b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs @@ -43,27 +43,30 @@ namespace Microsoft.Extensions.DependencyInjection if (type is not { IsClass: true, IsAbstract: false }) continue; if (typeof(ISingletonDependency).IsAssignableFrom(type)) { - var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency")); + var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency") && !p.FullName.Contains("IDisposable")); foreach (var interfaceType in interfaceTypes) { + Log.Logger.Information($"正在IOC注入ISingletonDependency {type.Name}..."); services.AddSingleton(interfaceType, type); } } if (typeof(ITransientDependency).IsAssignableFrom(type)) { - var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); + var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency") && !p.FullName.Contains("IDisposable")); foreach (var interfaceType in interfaceTypes) { + Log.Logger.Information($"正在IOC注入ITransientDependency {type.Name}..."); services.AddTransient(interfaceType, type); } } if (typeof(IScopedDependency).IsAssignableFrom(type)) { - var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency")); + var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency") && !p.FullName.Contains("IDisposable")); foreach (var interfaceType in interfaceTypes) { + Log.Logger.Information($"正在IOC注入IScopedDependency {type.Name}..."); services.AddScoped(interfaceType, type); } } diff --git a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs index fd52635..98f3eaf 100644 --- a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs +++ b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs @@ -16,7 +16,7 @@ namespace Microsoft.Extensions.DependencyInjection { services.AddTcpService(config => { - config.SetListenIPHosts(int.Parse(configuration["TCP:Port"] ?? "10500")) + config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "10500")) .ConfigureContainer(a => //容器的配置顺序应该在最前面 { a.AddConsoleLogger(); @@ -39,7 +39,7 @@ namespace Microsoft.Extensions.DependencyInjection { services.AddUdpSession(config => { - config.SetBindIPHost(int.Parse(configuration["UDP:Port"] ?? "10500")) + config.SetBindIPHost(int.Parse(configuration["UDP:ClientPort"] ?? "10500")) .ConfigureContainer(a => //容器的配置顺序应该在最前面 { //a.AddConsoleLogger(); diff --git a/JiShe.CollectBus.Console/Program.cs b/JiShe.CollectBus.Console/Program.cs index 5c6c9d5..c9a59f2 100644 --- a/JiShe.CollectBus.Console/Program.cs +++ b/JiShe.CollectBus.Console/Program.cs @@ -52,10 +52,9 @@ namespace JiShe.CollectBus.Console lc.ReadFrom.Configuration(configuration) .ReadFrom.Services(context); }); - services.ModuleRegister(hostContext); services.ServiceRegister(); services.PluginServiceRegister(); - + services.ModuleRegister(hostContext); services.AddTcp(configuration); //services.AddUdp(configuration); services.AddStackExchangeRedisCache(options => diff --git a/JiShe.CollectBus.Console/appsettings.json b/JiShe.CollectBus.Console/appsettings.json index 72e8a02..509fd46 100644 --- a/JiShe.CollectBus.Console/appsettings.json +++ b/JiShe.CollectBus.Console/appsettings.json @@ -30,7 +30,10 @@ "Default": "Data Source=192.168.111.248;Port=3306;Database=JiSheCollectBus;uid=root;pwd=123456abcD;charset=utf8mb4;Allow User Variables=true;AllowLoadLocalInfile=true", "ClickHouse": "host=localhost;port=8123;user=default;password=;database=default" }, - + "MongoSettings": { + "Connection": "mongodb://backups_admin:jishe_mongodb_backups@118.190.144.92:27037", + "DatabaseName": "JiSheCollectBus" + }, "TCP": { "Port": 10500 }, diff --git a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs index 9cb8fc3..ceab344 100644 --- a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs +++ b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs @@ -5,6 +5,7 @@ using TouchSocket.Core; using TouchSocket.Sockets; using JiShe.CollectBus.RabbitMQ.Senders; using Microsoft.Extensions.Logging; +using JiShe.CollectBus.MongoDB; namespace JiShe.CollectBus.Core.Plugins { @@ -12,37 +13,27 @@ namespace JiShe.CollectBus.Core.Plugins { private readonly INSender _nSender; private readonly ILogger _logger; + public readonly IMongoRepository _mongoHeartbeatRepository; + public readonly IMongoRepository _mongoLoginRepository; - public TcpServiceReceivedPlugin(INSender nSender, ILogger logger) + public TcpServiceReceivedPlugin(INSender nSender, ILogger logger,IMongoRepository mongoHeartbeatRepository, IMongoRepository mongoLoginRepository) { _nSender = nSender; _logger = logger; + _mongoHeartbeatRepository = mongoHeartbeatRepository; + _mongoLoginRepository = mongoLoginRepository; } [GeneratorPlugin(typeof(ITcpReceivedPlugin))] public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e) { - ////TODO: 电表主站到集中器的协议都是376.1协议,集中器下发到电表协议分为645-07和modbus - ////TODO: 水表主站到集中器的协议分为118和645-97协议 - ////TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送 - - //const string protocolType = "StandardProtocol"; - - //var protocolPlugin = _serviceProvider.GetKeyedService(protocolType); - //var protocolPluginInfo = await protocolPlugin.GetAsync(); - //client.Logger.Info($"{protocolPluginInfo.Name},{protocolPluginInfo.RegularExpression}"); - ////从客户端收到信息 - //var messageHexString = Convert.ToHexString(e.ByteBlock.Span); - //client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}"); - - //await protocolPlugin.ReceivedAsync(client,e); - var messageHexString = Convert.ToHexString(e.ByteBlock.Span); var hexStringList = messageHexString.StringToPairs(); var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); + var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); if (aFn.HasValue && fn.HasValue) { @@ -51,19 +42,61 @@ namespace JiShe.CollectBus.Core.Plugins switch (fn) { case 1://登录 - await _nSender.SendToReceivedLoginAsync(new MessageReceivedLoginEvent(client.Id,client.IP, client.Port, messageHexString,"")); + + var messageReceivedLoginEvent = new MessageReceivedLoginEvent + { + ClientId = client.Id, + ClientIp = client.IP, + ClientPort = client.Port, + MessageHexString = messageHexString, + ReceivedTime = DateTime.Now, + DeviceNo = aTuple.Item1, + MessageId = Guid.NewGuid().ToString() + }; + await _mongoLoginRepository.AddAsync(messageReceivedLoginEvent); + await _nSender.SendToReceivedLoginAsync(messageReceivedLoginEvent); break; case 2://退出登录 - await _nSender.SendToReceivedLoginAsync(new MessageReceivedLoginEvent(client.Id, client.IP, client.Port, messageHexString, "")); + var messageReceivedExitLoginEvent = new MessageReceivedLoginEvent + { + ClientId = client.Id, + ClientIp = client.IP, + ClientPort = client.Port, + MessageHexString = messageHexString, + ReceivedTime = DateTime.Now, + DeviceNo = aTuple.Item1, + MessageId = Guid.NewGuid().ToString() + }; + await _mongoLoginRepository.AddAsync(messageReceivedExitLoginEvent); + await _nSender.SendToReceivedLoginAsync(messageReceivedExitLoginEvent); break; case 3://心跳 - await _nSender.SendToReceivedHeartbeatAsync(new MessageReceivedHeartbeatEvent(client.Id, client.IP, client.Port, messageHexString, "")); + var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeatEvent + { + ClientId = client.Id, + ClientIp = client.IP, + ClientPort = client.Port, + MessageHexString = messageHexString, + ReceivedTime = DateTime.Now, + DeviceNo = aTuple.Item1, + MessageId = Guid.NewGuid().ToString() + }; + await _mongoHeartbeatRepository.AddAsync(messageReceivedHeartbeatEvent); + await _nSender.SendToReceivedHeartbeatAsync(messageReceivedHeartbeatEvent); break; } } else { - await _nSender.SendToReceivedAsync(new MessageReceivedEvent(client.Id, client.IP, client.Port, messageHexString, "")); + await _nSender.SendToReceivedAsync(new MessageReceivedEvent + { + ClientId = client.Id, + ClientIp = client.IP, + ClientPort = client.Port, + MessageHexString = messageHexString, + DeviceNo = aTuple.Item1, + MessageId = Guid.NewGuid().ToString() + }); } } else diff --git a/JiShe.CollectBus.MongoDB/IMongoContext.cs b/JiShe.CollectBus.MongoDB/IMongoContext.cs new file mode 100644 index 0000000..70dcaec --- /dev/null +++ b/JiShe.CollectBus.MongoDB/IMongoContext.cs @@ -0,0 +1,37 @@ +using MongoDB.Driver; + +namespace JiShe.CollectBus.MongoDB +{ + public interface IMongoContext : IDisposable + { + /// + /// 添加命令操作 + /// + /// 委托 方法接受一个 Func 委托作为参数,该委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法 + /// + Task AddCommandAsync(Func func); + + /// + /// 提交更改并返回受影响的行数 + /// TODO:MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务 + /// 原因:MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。 + /// + /// MongoDB 会话(session)对象 + /// + Task SaveChangesAsync(IClientSessionHandle session); + + /// + /// 初始化Mongodb会话对象session + /// + /// + Task StartSessionAsync(); + + /// + /// 获取集合数据 + /// + /// + /// + /// + IMongoCollection GetCollection(string name); + } +} diff --git a/JiShe.CollectBus.MongoDB/IMongoRepository.cs b/JiShe.CollectBus.MongoDB/IMongoRepository.cs new file mode 100644 index 0000000..10e25eb --- /dev/null +++ b/JiShe.CollectBus.MongoDB/IMongoRepository.cs @@ -0,0 +1,176 @@ +using MongoDB.Driver; +using System.Linq.Expressions; + +namespace JiShe.CollectBus.MongoDB +{ + public interface IMongoRepository where T : class, new() + { + #region 事务操作示例 + + /// + /// 事务添加数据 + /// + /// MongoDB 会话(session)对象 + /// 添加数据 + /// + Task AddTransactionsAsync(IClientSessionHandle session, T objData); + + /// + /// 事务数据删除 + /// + /// MongoDB 会话(session)对象 + /// objectId + /// + Task DeleteTransactionsAsync(IClientSessionHandle session, string id); + + /// + /// 事务异步局部更新(仅更新一条记录) + /// + /// MongoDB 会话(session)对象 + /// 过滤器 + /// 更新条件 + /// + Task UpdateTransactionsAsync(IClientSessionHandle session, FilterDefinition filter, UpdateDefinition update); + + #endregion + + #region 添加相关操作 + + /// + /// 添加数据 + /// + /// 添加数据 + /// + Task AddAsync(T objData); + + /// + /// 批量插入 + /// + /// 实体集合 + /// + Task InsertManyAsync(List objDatas); + + #endregion + + #region 删除相关操作 + + /// + /// 数据删除 + /// + /// objectId + /// + Task DeleteAsync(string id); + + /// + /// 异步删除多条数据 + /// + /// 删除的条件 + /// + Task DeleteManyAsync(FilterDefinition filter); + + #endregion + + #region 修改相关操作 + + /// + /// 指定对象异步修改一条数据 + /// + /// 要修改的对象 + /// 修改条件 + /// + Task UpdateAsync(T obj, string id); + + /// + /// 局部更新(仅更新一条记录) + /// x.Id == 1 && x.Age > 18 && x.Gender == 0]]> + /// new T{ RealName = "Ray", Gender = 1}]]> + /// + /// 筛选条件 + /// 更新条件 + /// + Task UpdateAsync(Expression> expression, Expression> entity); + + /// + /// 异步局部更新(仅更新一条记录) + /// + /// 过滤器 + /// 更新条件 + /// + Task UpdateAsync(FilterDefinition filter, UpdateDefinition update); + + /// + /// 异步局部更新(仅更新多条记录) + /// + /// 筛选条件 + /// 更新条件 + /// + Task UpdateManyAsync(Expression> expression, UpdateDefinition update); + + /// + /// 异步批量修改数据 + /// + /// 要修改的字段 + /// 更新条件 + /// + Task UpdateManayAsync(Dictionary dic, FilterDefinition filter); + + #endregion + + #region 查询统计相关操作 + + /// + /// 通过ID主键获取数据 + /// + /// objectId + /// + Task GetByIdAsync(string id); + /// + /// 获取所有数据 + /// + /// + Task> GetAllAsync(); + + /// + /// 获取记录数 + /// + /// 筛选条件 + /// + Task CountAsync(Expression> expression); + + /// + /// 获取记录数 + /// + /// 过滤器 + /// + Task CountAsync(FilterDefinition filter); + + /// + /// 判断是否存在 + /// + /// 条件 + /// + Task ExistsAsync(Expression> predicate); + + /// + /// 异步查询集合 + /// + /// 查询条件 + /// 要查询的字段,不写时查询全部 + /// 要排序的字段 + /// + Task> FindListAsync(FilterDefinition filter, string[]? field = null, SortDefinition? sort = null); + + /// + /// 异步分页查询集合 + /// + /// 查询条件 + /// 当前页 + /// 页容量 + /// 要查询的字段,不写时查询全部 + /// 要排序的字段 + /// + Task> FindListByPageAsync(FilterDefinition filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition? sort = null); + + #endregion + } +} diff --git a/JiShe.CollectBus.MongoDB/IUnitOfWork.cs b/JiShe.CollectBus.MongoDB/IUnitOfWork.cs new file mode 100644 index 0000000..8ecadb0 --- /dev/null +++ b/JiShe.CollectBus.MongoDB/IUnitOfWork.cs @@ -0,0 +1,25 @@ +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.MongoDB +{ + public interface IUnitOfWork : IDisposable + { + /// + /// 提交保存更改 + /// + /// MongoDB 会话(session)对象 + /// + Task Commit(IClientSessionHandle session); + + /// + /// 初始化MongoDB会话对象session + /// + /// + Task InitTransaction(); + } +} diff --git a/JiShe.CollectBus.MongoDB/JiShe.CollectBus.MongoDB.csproj b/JiShe.CollectBus.MongoDB/JiShe.CollectBus.MongoDB.csproj new file mode 100644 index 0000000..51b3d9c --- /dev/null +++ b/JiShe.CollectBus.MongoDB/JiShe.CollectBus.MongoDB.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + diff --git a/JiShe.CollectBus.MongoDB/JiSheCollectBusMongoDBModule.cs b/JiShe.CollectBus.MongoDB/JiSheCollectBusMongoDBModule.cs new file mode 100644 index 0000000..6f36ac7 --- /dev/null +++ b/JiShe.CollectBus.MongoDB/JiSheCollectBusMongoDBModule.cs @@ -0,0 +1,17 @@ +using JiShe.CollectBus.Common.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using MongoDB.Driver; + +namespace JiShe.CollectBus.MongoDB +{ + public class JiSheCollectBusMongoDbModule: IJiSheModule + { + public void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(typeof(IMongoRepository<>), typeof(MongoBaseRepository<>)); + } + } +} diff --git a/JiShe.CollectBus.MongoDB/MongoBaseRepository.cs b/JiShe.CollectBus.MongoDB/MongoBaseRepository.cs new file mode 100644 index 0000000..ccf2f54 --- /dev/null +++ b/JiShe.CollectBus.MongoDB/MongoBaseRepository.cs @@ -0,0 +1,344 @@ +using JiShe.CollectBus.Common.Extensions.DependencyInjections; +using MongoDB.Bson; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations.Schema; +using System.Linq; +using System.Linq.Expressions; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.MongoDB +{ + public class MongoBaseRepository : IMongoRepository where T : class, new() + { + private readonly IMongoContext _context; + private readonly IMongoCollection _dbSet; + + public MongoBaseRepository(IMongoContext context) + { + _context = context; + var collectionName = typeof(T).GetCustomAttribute()?.Name ?? typeof(T).Name; + _dbSet = _context.GetCollection(collectionName); + } + + #region 事务操作示例 + + /// + /// 事务添加数据 + /// + /// MongoDB 会话(session)对象 + /// 添加数据 + /// + public async Task AddTransactionsAsync(IClientSessionHandle session, T objData) + { + await _context.AddCommandAsync(async (session) => await _dbSet.InsertOneAsync(objData)); + } + + /// + /// 事务数据删除 + /// + /// MongoDB 会话(session)对象 + /// objectId + /// + public async Task DeleteTransactionsAsync(IClientSessionHandle session, string id) + { + await _context.AddCommandAsync((session) => _dbSet.DeleteOneAsync(Builders.Filter.Eq(" _id ", id))); + } + + /// + /// 事务异步局部更新(仅更新一条记录) + /// + /// MongoDB 会话(session)对象 + /// 过滤器 + /// 更新条件 + /// + public async Task UpdateTransactionsAsync(IClientSessionHandle session, FilterDefinition filter, UpdateDefinition update) + { + await _context.AddCommandAsync((session) => _dbSet.UpdateOneAsync(filter, update)); + } + + #endregion + + #region 添加相关操作 + + /// + /// 添加数据 + /// + /// 添加数据 + /// + public async Task AddAsync(T objData) + { + await _dbSet.InsertOneAsync(objData); + } + + /// + /// 批量插入 + /// + /// 实体集合 + /// + public async Task InsertManyAsync(List objDatas) + { + await _dbSet.InsertManyAsync(objDatas); + } + + #endregion + + #region 删除相关操作 + + /// + /// 数据删除 + /// + /// objectId + /// + public async Task DeleteAsync(string id) + { + await _dbSet.DeleteOneAsync(Builders.Filter.Eq("_id", new ObjectId(id))); + } + + /// + /// 异步删除多条数据 + /// + /// 删除的条件 + /// + public async Task DeleteManyAsync(FilterDefinition filter) + { + return await _dbSet.DeleteManyAsync(filter); + } + + #endregion + + #region 修改相关操作 + + /// + /// 指定对象异步修改一条数据 + /// + /// 要修改的对象 + /// 修改条件 + /// + public async Task UpdateAsync(T obj, string id) + { + //修改条件 + FilterDefinition filter = Builders.Filter.Eq("_id", new ObjectId(id)); + //要修改的字段 + var list = new List>(); + foreach (var item in obj.GetType().GetProperties()) + { + if (item.Name.ToLower() == "id") continue; + list.Add(Builders.Update.Set(item.Name, item.GetValue(obj))); + } + var updatefilter = Builders.Update.Combine(list); + await _dbSet.UpdateOneAsync(filter, updatefilter); + } + + /// + /// 局部更新(仅更新一条记录) + /// x.Id == 1 && x.Age > 18 && x.Gender == 0]]> + /// new T{ RealName = "Ray", Gender = 1}]]> + /// + /// 筛选条件 + /// 更新条件 + /// + public async Task UpdateAsync(Expression> expression, Expression> entity) + { + var fieldList = new List>(); + + if (entity.Body is MemberInitExpression param) + { + foreach (var item in param.Bindings) + { + var propertyName = item.Member.Name; + object propertyValue = null; + + if (item is not MemberAssignment memberAssignment) continue; + + if (memberAssignment.Expression.NodeType == ExpressionType.Constant) + { + if (memberAssignment.Expression is ConstantExpression constantExpression) + propertyValue = constantExpression.Value; + } + else + { + propertyValue = Expression.Lambda(memberAssignment.Expression, null).Compile().DynamicInvoke(); + } + + if (propertyName != "_id") //实体键_id不允许更新 + { + fieldList.Add(Builders.Update.Set(propertyName, propertyValue)); + } + } + } + + await _dbSet.UpdateOneAsync(expression, Builders.Update.Combine(fieldList)); + } + + /// + /// 异步局部更新(仅更新一条记录) + /// + /// 过滤器 + /// 更新条件 + /// + public async Task UpdateAsync(FilterDefinition filter, UpdateDefinition update) + { + await _dbSet.UpdateOneAsync(filter, update); + } + + /// + /// 异步局部更新(仅更新多条记录) + /// + /// 筛选条件 + /// 更新条件 + /// + public async Task UpdateManyAsync(Expression> expression, UpdateDefinition update) + { + await _dbSet.UpdateManyAsync(expression, update); + } + + /// + /// 异步批量修改数据 + /// + /// 要修改的字段 + /// 更新条件 + /// + public async Task UpdateManayAsync(Dictionary dic, FilterDefinition filter) + { + T t = new T(); + //要修改的字段 + var list = new List>(); + foreach (var item in t.GetType().GetProperties()) + { + if (!dic.ContainsKey(item.Name)) continue; + var value = dic[item.Name]; + list.Add(Builders.Update.Set(item.Name, value)); + } + var updatefilter = Builders.Update.Combine(list); + return await _dbSet.UpdateManyAsync(filter, updatefilter); + } + + #endregion + + #region 查询统计相关操作 + + /// + /// 通过ID主键获取数据 + /// + /// objectId + /// + public async Task GetByIdAsync(string id) + { + var queryData = await _dbSet.FindAsync(Builders.Filter.Eq("_id", new ObjectId(id))); + return queryData.FirstOrDefault(); + } + + /// + /// 获取所有数据 + /// + /// + public async Task> GetAllAsync() + { + var queryAllData = await _dbSet.FindAsync(Builders.Filter.Empty); + return queryAllData.ToList(); + } + + /// + /// 获取记录数 + /// + /// 筛选条件 + /// + public async Task CountAsync(Expression> expression) + { + return await _dbSet.CountDocumentsAsync(expression); + } + + /// + /// 获取记录数 + /// + /// 过滤器 + /// + public async Task CountAsync(FilterDefinition filter) + { + return await _dbSet.CountDocumentsAsync(filter); + } + + /// + /// 判断是否存在 + /// + /// 条件 + /// + public async Task ExistsAsync(Expression> predicate) + { + return await Task.FromResult(_dbSet.AsQueryable().Any(predicate)); + } + + /// + /// 异步查询集合 + /// + /// 查询条件 + /// 要查询的字段,不写时查询全部 + /// 要排序的字段 + /// + public async Task> FindListAsync(FilterDefinition filter, string[]? field = null, SortDefinition? sort = null) + { + //不指定查询字段 + if (field == null || field.Length == 0) + { + if (sort == null) return await _dbSet.Find(filter).ToListAsync(); + return await _dbSet.Find(filter).Sort(sort).ToListAsync(); + } + + //指定查询字段 + var fieldList = new List>(); + for (int i = 0; i < field.Length; i++) + { + fieldList.Add(Builders.Projection.Include(field[i].ToString())); + } + var projection = Builders.Projection.Combine(fieldList); + fieldList?.Clear(); + + //不排序 + if (sort == null) return await _dbSet.Find(filter).Project(projection).ToListAsync(); + + //排序查询 + return await _dbSet.Find(filter).Sort(sort).Project(projection).ToListAsync(); + } + + /// + /// 异步分页查询集合 + /// + /// 查询条件 + /// 当前页 + /// 页容量 + /// 要查询的字段,不写时查询全部 + /// 要排序的字段 + /// + public async Task> FindListByPageAsync(FilterDefinition filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition? sort = null) + { + //不指定查询字段 + if (field == null || field.Length == 0) + { + if (sort == null) return await _dbSet.Find(filter).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); + //进行排序 + return await _dbSet.Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); + } + + //指定查询字段 + var fieldList = new List>(); + for (int i = 0; i < field.Length; i++) + { + fieldList.Add(Builders.Projection.Include(field[i].ToString())); + } + var projection = Builders.Projection.Combine(fieldList); + fieldList?.Clear(); + + //不排序 + if (sort == null) return await _dbSet.Find(filter).Project(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); + + //排序查询 + return await _dbSet.Find(filter).Sort(sort).Project(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); + } + + #endregion + } +} diff --git a/JiShe.CollectBus.MongoDB/MongoContext.cs b/JiShe.CollectBus.MongoDB/MongoContext.cs new file mode 100644 index 0000000..f672e41 --- /dev/null +++ b/JiShe.CollectBus.MongoDB/MongoContext.cs @@ -0,0 +1,99 @@ +using Microsoft.Extensions.Configuration; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.Common.Extensions.DependencyInjections; + +namespace JiShe.CollectBus.MongoDB +{ + public class MongoContext : IMongoContext + { + private readonly IMongoDatabase _database; + private readonly MongoClient _mongoClient; + + //这里将 _commands 中的每个元素都定义为一个 Func 委托,此委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法 + //每个委托都表示一个MongoDB 会话(session)对象和要执行的命令 + private readonly List> _commands = []; + + public MongoContext(IConfiguration configuration) + { + _mongoClient = new MongoClient(configuration["MongoSettings:Connection"]); + _database = _mongoClient.GetDatabase(configuration["MongoSettings:DatabaseName"]); + } + + /// + /// 添加命令操作 + /// + /// 方法接受一个 Func 委托作为参数,该委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法 + /// + public async Task AddCommandAsync(Func func) + { + _commands.Add(func); + await Task.CompletedTask; + } + + /// + /// 提交更改并返回受影响的行数 + /// TODO:MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务 + /// 原因:MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。 + /// + /// MongoDB 会话(session)对象 + /// + public async Task SaveChangesAsync(IClientSessionHandle session) + { + try + { + session.StartTransaction();//开始事务 + + foreach (var command in _commands) + { + await command(session); + } + + await session.CommitTransactionAsync();//提交事务 + return _commands.Count; + } + catch (Exception ex) + { + await session.AbortTransactionAsync();//回滚事务 + return 0; + } + finally + { + _commands.Clear();//清空_commands列表中的元素 + } + } + + /// + /// 初始化Mongodb会话对象session + /// + /// + public async Task StartSessionAsync() + { + var session = await _mongoClient.StartSessionAsync(); + return session; + } + + /// + /// 获取MongoDB集合 + /// + /// + /// 集合名称 + /// + public IMongoCollection GetCollection(string name) + { + return _database.GetCollection(name); + } + + /// + /// 释放上下文 + /// + public void Dispose() + { + GC.SuppressFinalize(this); + } + } +} diff --git a/JiShe.CollectBus.MongoDB/UnitOfWork.cs b/JiShe.CollectBus.MongoDB/UnitOfWork.cs new file mode 100644 index 0000000..9906f87 --- /dev/null +++ b/JiShe.CollectBus.MongoDB/UnitOfWork.cs @@ -0,0 +1,44 @@ +using JiShe.CollectBus.Common.Extensions.DependencyInjections; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.MongoDB +{ + public class UnitOfWork : IUnitOfWork + { + private readonly IMongoContext _context; + + public UnitOfWork(IMongoContext context) + { + _context = context; + } + + /// + /// 提交保存更改 + /// + /// MongoDB 会话(session)对象 + /// + public async Task Commit(IClientSessionHandle session) + { + return await _context.SaveChangesAsync(session) > 0; + } + + /// + /// 初始化MongoDB会话对象session + /// + /// + public async Task InitTransaction() + { + return await _context.StartSessionAsync(); + } + + public void Dispose() + { + _context.Dispose(); + } + } +} diff --git a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 8427949..7df62a8 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using JiShe.CollectBus.Common; -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Protocol.Contracts.Interfaces; @@ -12,9 +7,8 @@ using Microsoft.Extensions.Logging; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { - public abstract class BaseProtocolPlugin : IProtocolPlugin + public abstract class BaseProtocolPlugin(ILogger logger) : IProtocolPlugin { - public readonly ILogger _logger; //起始字符 public const string stx = "68"; //结束字符 @@ -27,11 +21,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts public const int tPLen = 6; - protected BaseProtocolPlugin(ILogger logger) - { - _logger = logger; - } - public abstract Task GetAsync(); public abstract Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent, Action? sendAction = null); @@ -42,7 +31,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts /// 报文 /// 发送委托 /// - public virtual Task LoginAsync(MessageReceivedLoginEvent messageReceivedEvent, Action? sendAction = null) + public virtual async Task LoginAsync(MessageReceivedLoginEvent messageReceivedEvent, Action? sendAction = null) { var hexStringList = messageReceivedEvent.MessageHexString.StringToPairs(); var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); @@ -69,7 +58,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { sendAction(bytes); } - return Task.CompletedTask; } /// @@ -78,7 +66,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts /// 报文 /// 发送委托 /// - public virtual Task HeartbeatAsync(MessageReceivedHeartbeatEvent messageReceivedEvent, Action? sendAction = null) + public virtual async Task HeartbeatAsync(MessageReceivedHeartbeatEvent messageReceivedEvent, Action? sendAction = null) { var hexStringList = messageReceivedEvent.MessageHexString.StringToPairs(); var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); @@ -113,7 +101,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts sendAction(bytes); } } - return Task.CompletedTask; } @@ -129,7 +116,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts cmdStrList.AddRange(userDatas); cmdStrList.Add(cs); cmdStrList.Add(end); - _logger.LogInformation($"回复:{string.Join(" ", cmdStrList)}"); + logger.LogInformation($"回复:{string.Join(" ", cmdStrList)}"); var bytes = cmdStrList.Select(x => Convert.ToByte(x, 16)).ToArray(); return bytes; } diff --git a/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index 2dba74a..2b18de3 100644 --- a/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -1,9 +1,9 @@  - netstandard2.1 + net8.0 + enable enable - preview @@ -14,6 +14,7 @@ + diff --git a/JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs b/JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs index 2ba9153..58d4df7 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs @@ -1,9 +1,13 @@ -namespace JiShe.CollectBus.Protocol.Contracts.Models +using JiShe.CollectBus.Common.Enums; + +namespace JiShe.CollectBus.Protocol.Contracts.Models { public class MessageIssuedEvent { public string ClientId { get; set; } public byte[] Message { get; set; } public string DeviceNo { get; set; } + public IssuedEventType Type { get; set; } + public string MessageId { get; set; } } } diff --git a/JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs b/JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs index 5a6aa3c..f222b3b 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs @@ -1,53 +1,52 @@ -namespace JiShe.CollectBus.Protocol.Contracts.Models +using JetBrains.Annotations; + +namespace JiShe.CollectBus.Protocol.Contracts.Models { public class MessageReceivedEvent { - public MessageReceivedEvent(string clientId, string clientIp,int port, string messageHexString, string deviceNo) - { - ClientId = clientId; - ClientIP = clientIp; - Port = port; - MessageHexString = messageHexString; - DeviceNo = deviceNo; - } - + public string MessageId { get; set; } = string.Empty; /// /// 客服端标识 /// - public string ClientId { get; set; } + public string ClientId { get; set; } = string.Empty; /// /// 客服端IP /// - public string ClientIP { get; set; } + public string ClientIp { get; set; } = string.Empty; /// /// 客服端端口 /// - public int Port { get; set; } + public int ClientPort { get; set; } /// /// 客服端报文 /// - public string MessageHexString { get; set; } + public string MessageHexString { get; set; } = string.Empty; /// /// 设备号 /// - public string DeviceNo { get; set; } + public string DeviceNo { get; set; } = string.Empty; + + /// + /// 接收指令时间 + /// + public DateTime ReceivedTime { get; set; } } public class MessageReceivedLoginEvent: MessageReceivedEvent { - public MessageReceivedLoginEvent(string clientId, string clientIp, int port, string messageHexString, string deviceNo) : base(clientId, clientIp, port, messageHexString, deviceNo) - { - } + public bool IsAck { get; set; } = false; + + public DateTime? AckTime { get; set; } } public class MessageReceivedHeartbeatEvent : MessageReceivedEvent { - public MessageReceivedHeartbeatEvent(string clientId, string clientIp, int port, string messageHexString, string deviceNo) : base(clientId, clientIp, port, messageHexString, deviceNo) - { - } + public bool IsAck { get; set; } = false; + + public DateTime? AckTime { get; set; } } } diff --git a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs index b1f43b8..dcff1d6 100644 --- a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Common.Extensions.DependencyInjections; +using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Attributes; using JiShe.CollectBus.Protocol.Contracts.Models; diff --git a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index a47505a..a9c30b7 100644 --- a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -1,5 +1,4 @@ -using JiShe.CollectBus.Common; -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions.DependencyInjections; using JiShe.CollectBus.Common.Models; @@ -7,9 +6,7 @@ using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Attributes; using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.RabbitMQ.Senders; -using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.Logging; -using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol { @@ -21,7 +18,6 @@ namespace JiShe.CollectBus.Protocol var info = new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980"); return await Task.FromResult(info); } - public override async Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent, Action? sendAction = null) { var cmdResult = AnalysisCmd(messageReceivedEvent.MessageHexString); @@ -37,28 +33,21 @@ namespace JiShe.CollectBus.Protocol { async void SendAction(byte[] bytes) { - await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes }); + await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login,MessageId = messageReceivedEvent.MessageId}); } await base.LoginAsync(messageReceivedEvent, SendAction); } - public override async Task HeartbeatAsync(MessageReceivedHeartbeatEvent messageReceivedEvent, Action? sendAction = null) { async void SendAction(byte[] bytes) { - await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes }); + await nSender.SendToIssuedAsync(new MessageIssuedEvent { ClientId = messageReceivedEvent.ClientId, DeviceNo = messageReceivedEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedEvent.MessageId }); } await base.HeartbeatAsync(messageReceivedEvent, SendAction); } - - - - - - #region 上行命令 //68 diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs index 744b4a7..b518b67 100644 --- a/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs @@ -1,4 +1,7 @@ -using JiShe.CollectBus.Protocol.Contracts.Models; +using System.Security.Cryptography.X509Certificates; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.MongoDB; +using JiShe.CollectBus.Protocol.Contracts.Models; using MassTransit; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; @@ -9,15 +12,33 @@ namespace JiShe.CollectBus.RabbitMQ.Consumers { private readonly ILogger _logger; private readonly ITcpService _tcpService; + public readonly IMongoRepository _mongoHeartbeatRepository; + public readonly IMongoRepository _mongoLoginRepository; - public MessageIssuedConsumer(ILogger logger, ITcpService tcpService) + + public MessageIssuedConsumer(ILogger logger, ITcpService tcpService, IMongoRepository mongoHeartbeatRepository, IMongoRepository mongoLoginRepository) { _logger = logger; - _tcpService = tcpService; + _tcpService = tcpService; + _mongoHeartbeatRepository = mongoHeartbeatRepository; + _mongoLoginRepository = mongoLoginRepository; } public async Task Consume(ConsumeContext context) { + switch (context.Message.Type) + { + case IssuedEventType.Heartbeat: + await _mongoHeartbeatRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b =>new MessageReceivedHeartbeatEvent { IsAck = true,AckTime=DateTime.Now}); + break; + case IssuedEventType.Login: + await _mongoLoginRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedLoginEvent { IsAck = true, AckTime = DateTime.Now }); + break; + case IssuedEventType.Data: + break; + default: + throw new ArgumentOutOfRangeException(); + } await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); } } diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedHeartbeatConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedHeartbeatConsumer.cs index ceda2ae..50c35e5 100644 --- a/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedHeartbeatConsumer.cs +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedHeartbeatConsumer.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.MongoDB; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; using MassTransit; using Microsoft.Extensions.DependencyInjection; diff --git a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj index 254eb41..b2287a5 100644 --- a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj +++ b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj @@ -15,6 +15,7 @@ + diff --git a/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs b/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs index 7b9db9f..86c9d33 100644 --- a/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs +++ b/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs @@ -30,24 +30,28 @@ namespace JiShe.CollectBus.RabbitMQ { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); + configurator.Durable = true; }); // 登录 - cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login" ?? string.Empty, configurator => + cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login", configurator => { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); + configurator.Durable = true; }); // 心跳 - cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat" ?? string.Empty, configurator => + cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat", configurator => { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); + configurator.Durable = true; }); // 消息下发队列 cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator => { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); + configurator.Durable = true; }); }); }); diff --git a/JiShe.CollectBus.RabbitMQ/Senders/NSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/NSender.cs index b435e4c..62e15f9 100644 --- a/JiShe.CollectBus.RabbitMQ/Senders/NSender.cs +++ b/JiShe.CollectBus.RabbitMQ/Senders/NSender.cs @@ -1,10 +1,5 @@ using MassTransit; using Microsoft.Extensions.Configuration; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using JiShe.CollectBus.Common.Extensions.DependencyInjections; namespace JiShe.CollectBus.RabbitMQ.Senders diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index 0832b98..9bb2eda 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -25,6 +25,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.ClickHouse EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.RabbitMQ", "JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj", "{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.MongoDB", "JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj", "{223DBDB1-6CD3-4D4E-8795-42550BC0A871}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -67,6 +69,10 @@ Global {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.Build.0 = Debug|Any CPU {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.ActiveCfg = Release|Any CPU {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.Build.0 = Release|Any CPU + {223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Debug|Any CPU.Build.0 = Debug|Any CPU + {223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Release|Any CPU.ActiveCfg = Release|Any CPU + {223DBDB1-6CD3-4D4E-8795-42550BC0A871}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -81,6 +87,7 @@ Global {16D42BCF-EDB8-4153-B37D-0B10FB6DF36C} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5} {65A2837C-A5EE-475B-8079-EE5A1BCD2E8F} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5} {DB46D90E-304D-48B7-9ED6-F4DCC95D3824} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5} + {223DBDB1-6CD3-4D4E-8795-42550BC0A871} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {33261859-9CD1-4A43-B181-AB75C247D1CD}