Compare commits

...

5 Commits

Author SHA1 Message Date
cli
19bc05c601 Merge branch 'cassandra' into dev 2025-04-17 18:45:15 +08:00
cli
76ae008e01 合并冲突 2025-04-17 16:21:42 +08:00
cli
e91db2d0d4 修改配置文件 2025-04-17 16:10:47 +08:00
cli
73c12e5160 Merge remote-tracking branch 'origin/dev' into cassandra 2025-04-17 16:09:19 +08:00
cli
4494c92e7c Cassandra 性能局部调优 2025-04-16 16:12:38 +08:00
13 changed files with 148 additions and 44 deletions

View File

@ -64,7 +64,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
await dbContext.InitAmmeterCacheData(); //await dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData(); //await dbContext.InitWatermeterCacheData();
//初始化主题信息 //初始化主题信息

View File

@ -19,33 +19,109 @@ using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using Volo.Abp.Domain.Repositories;
using System.Diagnostics;
using System.Linq;
using Cassandra;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
[AllowAnonymous] [AllowAnonymous]
public class TestAppService : CollectBusAppService, IApplicationService public class TestAppService : CollectBusAppService
{ {
private readonly ILogger<TestAppService> _logger; private readonly ILogger<TestAppService> _logger;
private readonly ICassandraRepository<MessageIssued,string> _messageIssuedCassandraRepository; private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
private readonly ICassandraProvider _cassandraProvider;
public TestAppService( public TestAppService(
ILogger<TestAppService> logger, ILogger<TestAppService> logger,
ICassandraRepository<MessageIssued, string> messageIssuedCassandraRepository ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository, ICassandraProvider cassandraProvider)
)
{ {
_logger = logger; _logger = logger;
_messageIssuedCassandraRepository = messageIssuedCassandraRepository; _messageReceivedCassandraRepository = messageReceivedCassandraRepository;
_cassandraProvider = cassandraProvider;
} }
public async Task AddMessage() public async Task AddMessageOfCassandra()
{ {
await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued var stopwatch = Stopwatch.StartNew();
for (int i = 1; i <= 10000; i++)
{ {
ClientId = Guid.NewGuid().ToString(), var str = Guid.NewGuid().ToString();
Message = Array.Empty<byte>(), await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued
DeviceNo = "123321312", {
MessageId = Guid.NewGuid().ToString(), ClientId = str,
Type = IssuedEventType.Data DeviceNo = i.ToString(),
}); MessageId = str,
Type = IssuedEventType.Data,
Id = str,
Message = str.GetBytes()
});
}
stopwatch.Stop();
_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
}
public async Task AddMessageOfBulkInsertCassandra()
{
var records = new List<MessageIssued>();
var prepared = await _cassandraProvider.Session.PrepareAsync(
$"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)");
for (int i = 1; i <= 100000; i++)
{
var str = Guid.NewGuid().ToString();
records.Add(new MessageIssued
{
ClientId = str,
DeviceNo = i.ToString(),
MessageId = str,
Type = IssuedEventType.Data,
Id = str,
Message = str.GetBytes()
});
}
var stopwatch = Stopwatch.StartNew();
await BulkInsertAsync(_cassandraProvider.Session, prepared, records);
stopwatch.Stop();
_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
}
private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List<MessageIssued> records)
{
var tasks = new List<Task>();
var batch = new BatchStatement();
for (int i = 0; i < records.Count; i++)
{
var record = records[i];
var boundStatement = prepared.Bind(
record.Id,
record.ClientId,
record.Message,
record.DeviceNo,
(int)record.Type,
record.MessageId);
// 设置一致性级别为ONE以提高性能
boundStatement.SetConsistencyLevel(ConsistencyLevel.One);
batch.Add(boundStatement);
// 当达到批处理大小时执行
if (batch.Statements.Count() >= 1000 || i == records.Count - 1)
{
tasks.Add(session.ExecuteAsync(batch));
batch = new BatchStatement();
}
}
// 等待所有批处理完成
await Task.WhenAll(tasks);
} }
} }

View File

@ -6,6 +6,7 @@ using Cassandra;
using Cassandra.Mapping; using Cassandra.Mapping;
using Cassandra.Data.Linq; using Cassandra.Data.Linq;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
@ -35,6 +36,12 @@ namespace JiShe.CollectBus.Cassandra
_logger = logger; _logger = logger;
} }
public Task InitClusterAndSessionAsync()
{
InitClusterAndSession();
return Task.CompletedTask;
}
public void InitClusterAndSession() public void InitClusterAndSession()
{ {
GetCluster((keyspace) => GetCluster((keyspace) =>

View File

@ -1,6 +1,12 @@
using Cassandra; using Cassandra;
using Cassandra.Data.Linq;
using Cassandra.Mapping; using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Extensions; using JiShe.CollectBus.Cassandra.Extensions;
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.IoTDBProvider;
using Microsoft.AspNetCore.Http;
using System.Reflection;
using Thrift.Protocol.Entities;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
@ -10,9 +16,10 @@ namespace JiShe.CollectBus.Cassandra
: ICassandraRepository<TEntity, TKey> : ICassandraRepository<TEntity, TKey>
where TEntity : class where TEntity : class
{ {
private readonly ICassandraProvider _cassandraProvider;
public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig) public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig)
{ {
_cassandraProvider = cassandraProvider;
Mapper = new Mapper(cassandraProvider.Session, mappingConfig); Mapper = new Mapper(cassandraProvider.Session, mappingConfig);
cassandraProvider.Session.CreateTable<TEntity>(cassandraProvider.CassandraConfig.Keyspace); cassandraProvider.Session.CreateTable<TEntity>(cassandraProvider.CassandraConfig.Keyspace);
} }

View File

@ -13,17 +13,16 @@ namespace JiShe.CollectBus.Cassandra
)] )]
public class CollectBusCassandraModule : AbpModule public class CollectBusCassandraModule : AbpModule
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override Task ConfigureServicesAsync(ServiceConfigurationContext context)
{ {
Configure<CassandraConfig>(context.Services.GetConfiguration().GetSection("Cassandra")); Configure<CassandraConfig>(context.Services.GetConfiguration().GetSection("Cassandra"));
context.AddCassandra();
// context.AddCassandra(); return Task.CompletedTask;
} }
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{ {
// context.UseCassandra(); await context.UseCassandra();
} }
} }
} }

View File

@ -13,11 +13,11 @@ namespace Microsoft.Extensions.DependencyInjection
{ {
public static class ApplicationInitializationContextExtensions public static class ApplicationInitializationContextExtensions
{ {
public static void UseCassandra(this ApplicationInitializationContext context) public static async Task UseCassandra(this ApplicationInitializationContext context)
{ {
var service = context.ServiceProvider; var service = context.ServiceProvider;
var cassandraProvider = service.GetRequiredService<ICassandraProvider>(); var cassandraProvider = service.GetRequiredService<ICassandraProvider>();
cassandraProvider.InitClusterAndSession(); await cassandraProvider.InitClusterAndSessionAsync();
} }
} }
@ -25,11 +25,9 @@ namespace Microsoft.Extensions.DependencyInjection
{ {
public static void AddCassandra(this ServiceConfigurationContext context) public static void AddCassandra(this ServiceConfigurationContext context)
{ {
context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
context.Services.AddSingleton(new MappingConfiguration()
var mappingConfig = new MappingConfiguration() .Define(new CollectBusMapping()));
.Define(new CollectBusMapping());
context.Services.AddSingleton(mappingConfig);
} }
} }
} }

View File

@ -4,6 +4,8 @@ using Cassandra;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
using Cassandra.Mapping; using Cassandra.Mapping;
using Cassandra.Data.Linq;
using Thrift.Protocol.Entities;
namespace JiShe.CollectBus.Cassandra.Extensions namespace JiShe.CollectBus.Cassandra.Extensions
{ {
@ -14,7 +16,8 @@ namespace JiShe.CollectBus.Cassandra.Extensions
var type = typeof(TEntity); var type = typeof(TEntity);
var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>(); var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>();
var tableName = tableAttribute?.Name ?? type.Name.ToLower(); var tableName = tableAttribute?.Name ?? type.Name.ToLower();
var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace; //var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
var tableKeyspace = session.Keyspace;
var properties = type.GetProperties(); var properties = type.GetProperties();
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<KeyAttribute>() != null); var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<KeyAttribute>() != null);
@ -79,5 +82,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions
throw new NotSupportedException($"不支持的类型: {type.Name}"); throw new NotSupportedException($"不支持的类型: {type.Name}");
} }
} }
} }

View File

@ -20,5 +20,7 @@ namespace JiShe.CollectBus.Cassandra
Cluster GetCluster(Action<string?>? callback = null); Cluster GetCluster(Action<string?>? callback = null);
void InitClusterAndSession(); void InitClusterAndSession();
Task InitClusterAndSessionAsync();
} }
} }

View File

@ -9,17 +9,9 @@ using Volo.Abp;
namespace JiShe.CollectBus.Common.Attributes namespace JiShe.CollectBus.Common.Attributes
{ {
[AttributeUsage(AttributeTargets.Class, Inherited = false)] [AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class CassandraTableAttribute : Attribute public class CassandraTableAttribute(string? name = null) : Attribute
{ {
public CassandraTableAttribute(string? name = null,string? keyspace =null) public virtual string? Name { get; } = name;
{
Name = name;
Keyspace = keyspace;
}
public virtual string? Name { get; }
public virtual string? Keyspace { get; }
} }
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]

View File

@ -6,17 +6,24 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MessageIssueds namespace JiShe.CollectBus.IotSystems.MessageIssueds
{ {
[CassandraTable] [CassandraTable]
public class MessageIssued public class MessageIssued:IEntity<string>
{ {
[Key]
public string ClientId { get; set; } public string ClientId { get; set; }
public byte[] Message { get; set; } public byte[] Message { get; set; }
public string DeviceNo { get; set; } public string DeviceNo { get; set; }
public IssuedEventType Type { get; set; } public IssuedEventType Type { get; set; }
public string MessageId { get; set; } public string MessageId { get; set; }
[Key]
public string Id { get; set; }
public object?[] GetKeys()
{
return new object[] { Id };
}
} }
} }

View File

@ -34,7 +34,7 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100" "CorsOrigins": "http://localhost:4200,http://localhost:3100"
}, },
"ConnectionStrings": { "ConnectionStrings": {
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", "Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
@ -156,6 +156,12 @@
"Port": 9043, "Port": 9043,
"DataCenter": "dc1", "DataCenter": "dc1",
"Rack": "RAC2" "Rack": "RAC2"
},
{
"Host": "192.168.1.9",
"Port": 9044,
"DataCenter": "dc1",
"Rack": "RAC2"
} }
], ],
"Username": "admin", "Username": "admin",

View File

@ -9,6 +9,7 @@ using MongoDB.Driver;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Volo.Abp.Data; using Volo.Abp.Data;
using Volo.Abp.MongoDB; using Volo.Abp.MongoDB;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
@ -28,7 +29,11 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
public IMongoCollection<MessageReceivedHeartbeat> MessageReceivedHeartbeats => Collection<MessageReceivedHeartbeat>(); public IMongoCollection<MessageReceivedHeartbeat> MessageReceivedHeartbeats => Collection<MessageReceivedHeartbeat>();
public IMongoCollection<Device> Devices => Collection<Device>(); public IMongoCollection<Device> Devices => Collection<Device>();
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>(); public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
public IMongoCollection<MessageIssued> MessageIssueds => Collection<MessageIssued>();
protected override void CreateModel(IMongoModelBuilder modelBuilder) protected override void CreateModel(IMongoModelBuilder modelBuilder)
{ {
//modelBuilder.Entity<MeterReadingRecords>(builder => //modelBuilder.Entity<MeterReadingRecords>(builder =>

View File

@ -28,7 +28,7 @@ public class CollectBusMongoDbModule : AbpModule
{ {
context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options => context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options =>
{ {
options.AddDefaultRepositories(); options.AddDefaultRepositories(includeAllEntities: true);
// 注册分表策略 // 注册分表策略
context.Services.AddTransient( context.Services.AddTransient(