From 4494c92e7c699f2af491b3684c466d2b2157268a Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Wed, 16 Apr 2025 16:12:38 +0800 Subject: [PATCH 1/2] =?UTF-8?q?Cassandra=20=E6=80=A7=E8=83=BD=E5=B1=80?= =?UTF-8?q?=E9=83=A8=E8=B0=83=E4=BC=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusApplicationModule.cs | 2 +- .../Samples/TestAppService.cs | 102 +++++++++++++++--- .../CassandraProvider.cs | 7 ++ .../CassandraRepository.cs | 9 +- .../CollectBusCassandraModule.cs | 9 +- .../Extensions/ServiceCollectionExtensions.cs | 12 +-- .../Extensions/SessionExtension.cs | 7 +- .../ICassandraProvider.cs | 2 + .../Attributes/CassandraTableAttribute.cs | 12 +-- .../MessageIssueds/MessageIssued.cs | 11 +- src/JiShe.CollectBus.Host/appsettings.json | 2 +- .../MongoDB/CollectBusMongoDbContext.cs | 7 +- .../MongoDB/CollectBusMongoDbModule.cs | 2 +- 13 files changed, 141 insertions(+), 43 deletions(-) diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index f3ed978..0f7a5be 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -63,7 +63,7 @@ public class CollectBusApplicationModule : AbpModule //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); - await dbContext.InitAmmeterCacheData(); + //await dbContext.InitAmmeterCacheData(); //await dbContext.InitWatermeterCacheData(); //初始化主题信息 diff --git a/src/JiShe.CollectBus.Application/Samples/TestAppService.cs b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs index 9c53c3a..53f4b9e 100644 --- a/src/JiShe.CollectBus.Application/Samples/TestAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -19,33 +19,109 @@ using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.IotSystems.MessageIssueds; using Volo.Abp.Application.Services; +using JiShe.CollectBus.IotSystems.MessageReceiveds; +using Volo.Abp.Domain.Repositories; +using System.Diagnostics; +using System.Linq; +using Cassandra; namespace JiShe.CollectBus.Samples; [AllowAnonymous] -public class TestAppService : CollectBusAppService, IApplicationService +public class TestAppService : CollectBusAppService { private readonly ILogger _logger; - private readonly ICassandraRepository _messageIssuedCassandraRepository; + private readonly ICassandraRepository _messageReceivedCassandraRepository; + private readonly ICassandraProvider _cassandraProvider; + + + public TestAppService( ILogger logger, - ICassandraRepository messageIssuedCassandraRepository - ) + ICassandraRepository messageReceivedCassandraRepository, ICassandraProvider cassandraProvider) { _logger = logger; - _messageIssuedCassandraRepository = messageIssuedCassandraRepository; + _messageReceivedCassandraRepository = messageReceivedCassandraRepository; + _cassandraProvider = cassandraProvider; } - public async Task AddMessage() + public async Task AddMessageOfCassandra() { - await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued + var stopwatch = Stopwatch.StartNew(); + for (int i = 1; i <= 10000; i++) { - ClientId = Guid.NewGuid().ToString(), - Message = Array.Empty(), - DeviceNo = "123321312", - MessageId = Guid.NewGuid().ToString(), - Type = IssuedEventType.Data - }); + var str = Guid.NewGuid().ToString(); + await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued + { + ClientId = str, + DeviceNo = i.ToString(), + MessageId = str, + Type = IssuedEventType.Data, + Id = str, + Message = str.GetBytes() + }); + } + stopwatch.Stop(); + _logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); + } + + public async Task AddMessageOfBulkInsertCassandra() + { + var records = new List(); + var prepared = await _cassandraProvider.Session.PrepareAsync( + $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)"); + + for (int i = 1; i <= 100000; i++) + { + var str = Guid.NewGuid().ToString(); + records.Add(new MessageIssued + { + ClientId = str, + DeviceNo = i.ToString(), + MessageId = str, + Type = IssuedEventType.Data, + Id = str, + Message = str.GetBytes() + }); + } + var stopwatch = Stopwatch.StartNew(); + await BulkInsertAsync(_cassandraProvider.Session, prepared, records); + stopwatch.Stop(); + _logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); + } + + private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List records) + { + var tasks = new List(); + var batch = new BatchStatement(); + + for (int i = 0; i < records.Count; i++) + { + var record = records[i]; + var boundStatement = prepared.Bind( + record.Id, + record.ClientId, + record.Message, + record.DeviceNo, + (int)record.Type, + record.MessageId); + + // 设置一致性级别为ONE以提高性能 + boundStatement.SetConsistencyLevel(ConsistencyLevel.One); + + batch.Add(boundStatement); + + // 当达到批处理大小时执行 + if (batch.Statements.Count() >= 1000 || i == records.Count - 1) + { + tasks.Add(session.ExecuteAsync(batch)); + batch = new BatchStatement(); + } + } + + // 等待所有批处理完成 + await Task.WhenAll(tasks); } } diff --git a/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs index a5cd9c8..dc3e0ee 100644 --- a/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs +++ b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs @@ -6,6 +6,7 @@ using Cassandra; using Cassandra.Mapping; using Cassandra.Data.Linq; using System.ComponentModel.DataAnnotations; +using System.Diagnostics; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; @@ -35,6 +36,12 @@ namespace JiShe.CollectBus.Cassandra _logger = logger; } + public Task InitClusterAndSessionAsync() + { + InitClusterAndSession(); + return Task.CompletedTask; + } + public void InitClusterAndSession() { GetCluster((keyspace) => diff --git a/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs index 8381450..66ac6d7 100644 --- a/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs +++ b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs @@ -1,6 +1,12 @@ using Cassandra; +using Cassandra.Data.Linq; using Cassandra.Mapping; using JiShe.CollectBus.Cassandra.Extensions; +using JiShe.CollectBus.Common.Attributes; +using JiShe.CollectBus.IoTDBProvider; +using Microsoft.AspNetCore.Http; +using System.Reflection; +using Thrift.Protocol.Entities; using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Repositories; @@ -10,9 +16,10 @@ namespace JiShe.CollectBus.Cassandra : ICassandraRepository where TEntity : class { - + private readonly ICassandraProvider _cassandraProvider; public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig) { + _cassandraProvider = cassandraProvider; Mapper = new Mapper(cassandraProvider.Session, mappingConfig); cassandraProvider.Session.CreateTable(cassandraProvider.CassandraConfig.Keyspace); } diff --git a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs index 2502420..b5274f7 100644 --- a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs +++ b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs @@ -13,17 +13,16 @@ namespace JiShe.CollectBus.Cassandra )] public class CollectBusCassandraModule : AbpModule { - public override void ConfigureServices(ServiceConfigurationContext context) + public override Task ConfigureServicesAsync(ServiceConfigurationContext context) { Configure(context.Services.GetConfiguration().GetSection("Cassandra")); - context.AddCassandra(); - + return Task.CompletedTask; } - public override void OnApplicationInitialization(ApplicationInitializationContext context) + public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { - context.UseCassandra(); + await context.UseCassandra(); } } } diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs index 9c6a044..fe69268 100644 --- a/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs +++ b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs @@ -13,11 +13,11 @@ namespace Microsoft.Extensions.DependencyInjection { public static class ApplicationInitializationContextExtensions { - public static void UseCassandra(this ApplicationInitializationContext context) + public static async Task UseCassandra(this ApplicationInitializationContext context) { var service = context.ServiceProvider; var cassandraProvider = service.GetRequiredService(); - cassandraProvider.InitClusterAndSession(); + await cassandraProvider.InitClusterAndSessionAsync(); } } @@ -25,11 +25,9 @@ namespace Microsoft.Extensions.DependencyInjection { public static void AddCassandra(this ServiceConfigurationContext context) { - context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); - - var mappingConfig = new MappingConfiguration() - .Define(new CollectBusMapping()); - context.Services.AddSingleton(mappingConfig); + context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); + context.Services.AddSingleton(new MappingConfiguration() + .Define(new CollectBusMapping())); } } } diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs index f515416..c313d0c 100644 --- a/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs +++ b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs @@ -4,6 +4,8 @@ using Cassandra; using System.ComponentModel.DataAnnotations; using JiShe.CollectBus.Common.Attributes; using Cassandra.Mapping; +using Cassandra.Data.Linq; +using Thrift.Protocol.Entities; namespace JiShe.CollectBus.Cassandra.Extensions { @@ -14,7 +16,8 @@ namespace JiShe.CollectBus.Cassandra.Extensions var type = typeof(TEntity); var tableAttribute = type.GetCustomAttribute(); var tableName = tableAttribute?.Name ?? type.Name.ToLower(); - var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace; + //var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace; + var tableKeyspace = session.Keyspace; var properties = type.GetProperties(); var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null); @@ -79,5 +82,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions throw new NotSupportedException($"不支持的类型: {type.Name}"); } + + } } diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs index 45c6ca6..8b1f87a 100644 --- a/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs +++ b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs @@ -20,5 +20,7 @@ namespace JiShe.CollectBus.Cassandra Cluster GetCluster(Action? callback = null); void InitClusterAndSession(); + + Task InitClusterAndSessionAsync(); } } diff --git a/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs index bdd16d5..5ee6d8f 100644 --- a/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs +++ b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs @@ -9,17 +9,9 @@ using Volo.Abp; namespace JiShe.CollectBus.Common.Attributes { [AttributeUsage(AttributeTargets.Class, Inherited = false)] - public class CassandraTableAttribute : Attribute + public class CassandraTableAttribute(string? name = null) : Attribute { - public CassandraTableAttribute(string? name = null,string? keyspace =null) - { - Name = name; - Keyspace = keyspace; - } - - public virtual string? Name { get; } - - public virtual string? Keyspace { get; } + public virtual string? Name { get; } = name; } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs index 5977d5e..072abcb 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs @@ -6,17 +6,24 @@ using System.Text; using System.Threading.Tasks; using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Enums; +using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IotSystems.MessageIssueds { [CassandraTable] - public class MessageIssued + public class MessageIssued:IEntity { - [Key] public string ClientId { get; set; } public byte[] Message { get; set; } public string DeviceNo { get; set; } public IssuedEventType Type { get; set; } public string MessageId { get; set; } + [Key] + public string Id { get; set; } + + public object?[] GetKeys() + { + return new object[] { Id }; + } } } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index a837636..6ca53cc 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -34,7 +34,7 @@ "CorsOrigins": "http://localhost:4200,http://localhost:3100" }, "ConnectionStrings": { - "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", + "Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs index addb7c0..ebc5ad1 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbContext.cs @@ -9,6 +9,7 @@ using MongoDB.Driver; using System; using System.Collections.Concurrent; using System.Collections.Generic; +using JiShe.CollectBus.IotSystems.MessageIssueds; using Volo.Abp.Data; using Volo.Abp.MongoDB; using Volo.Abp.MultiTenancy; @@ -28,7 +29,11 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon public IMongoCollection MessageReceivedHeartbeats => Collection(); public IMongoCollection Devices => Collection(); public IMongoCollection ProtocolInfos => Collection(); - + + public IMongoCollection MessageIssueds => Collection(); + + + protected override void CreateModel(IMongoModelBuilder modelBuilder) { //modelBuilder.Entity(builder => diff --git a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs index 9a5a0f0..f427d19 100644 --- a/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs +++ b/src/JiShe.CollectBus.MongoDB/MongoDB/CollectBusMongoDbModule.cs @@ -28,7 +28,7 @@ public class CollectBusMongoDbModule : AbpModule { context.Services.AddMongoDbContext(options => { - options.AddDefaultRepositories(); + options.AddDefaultRepositories(includeAllEntities: true); // 注册分表策略 context.Services.AddTransient( From e91db2d0d4f5183d858ec51a180b307fbf2780c9 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Thu, 17 Apr 2025 16:10:47 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/JiShe.CollectBus.Host/appsettings.json | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 6ca53cc..654e945 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -153,6 +153,12 @@ "Port": 9043, "DataCenter": "dc1", "Rack": "RAC2" + }, + { + "Host": "192.168.1.9", + "Port": 9044, + "DataCenter": "dc1", + "Rack": "RAC2" } ], "Username": "admin",