From aa55e476c2a0066331642f86ee4cbfeef9c59fc2 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Tue, 15 Apr 2025 17:57:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=81=E8=A3=85Cassandra=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusApplicationModule.cs | 17 +- .../JiShe.CollectBus.Application.csproj | 3 + .../Samples/TestAppService.cs | 51 ++++++ .../Workers/CreateToBeIssueTaskWorker.cs | 2 +- .../Workers/SubscriberFifteenMinuteWorker.cs | 2 +- .../Workers/SubscriberFiveMinuteWorker.cs | 2 +- .../Workers/SubscriberOneMinuteWorker.cs | 2 +- .../BaseCassandraRepository.cs | 87 ---------- .../CassandraConfig.cs | 64 +++++++ .../CassandraConfiguration.cs | 45 ----- .../CassandraProvider.cs | 147 +++++++++++++++++ .../CassandraQueryOptimizer.cs | 156 ++++++++++++++++++ .../CassandraRepository.cs | 69 ++++++++ .../CassandraService.cs | 40 ----- .../CollectBusCassandraModule.cs | 20 ++- .../Extensions/ServiceCollectionExtensions.cs | 35 ++++ .../Extensions/SessionExtension.cs | 83 ++++++++++ .../ICassandraProvider.cs | 24 +++ .../ICassandraRepository.cs | 20 +++ .../ICassandraService.cs | 9 - .../JiShe.CollectBus.Cassandra.csproj | 5 +- .../Mappers/CollectBusMapping.cs | 20 +++ .../Attributes/CassandraTableAttribute.cs | 32 ++++ .../MessageIssueds/MessageIssued.cs | 4 + src/JiShe.CollectBus.Host/Program.cs | 40 ++++- src/JiShe.CollectBus.Host/Startup.cs | 9 +- src/JiShe.CollectBus.Host/appsettings.json | 45 ++++- .../JiSheCollectBusProtocolModule.cs | 4 +- 28 files changed, 830 insertions(+), 207 deletions(-) create mode 100644 src/JiShe.CollectBus.Application/Samples/TestAppService.cs delete mode 100644 src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraConfig.cs delete mode 100644 src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraProvider.cs create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraRepository.cs delete mode 100644 src/JiShe.CollectBus.Cassandra/CassandraService.cs create mode 100644 src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs create mode 100644 src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs create mode 100644 src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs create mode 100644 src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs delete mode 100644 src/JiShe.CollectBus.Cassandra/ICassandraService.cs create mode 100644 src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs create mode 100644 src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 1a8b326..5f80f51 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -12,8 +12,11 @@ using Microsoft.Extensions.DependencyInjection; using System.Collections.Generic; using System.Linq; using System.Reflection; +using System.Threading.Tasks; +using JiShe.CollectBus.Cassandra; using Volo.Abp; using Volo.Abp.Application; +using Volo.Abp.Autofac; using Volo.Abp.AutoMapper; using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers.Hangfire; @@ -27,11 +30,13 @@ namespace JiShe.CollectBus; typeof(CollectBusApplicationContractsModule), typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), + typeof(AbpAutofacModule), typeof(AbpBackgroundWorkersHangfireModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), - typeof(CollectBusIoTDBModule) + typeof(CollectBusIoTDBModule), + typeof(CollectBusCassandraModule) )] public class CollectBusApplicationModule : AbpModule { @@ -46,20 +51,20 @@ public class CollectBusApplicationModule : AbpModule }); } - public override void OnApplicationInitialization( + public override async Task OnApplicationInitializationAsync( ApplicationInitializationContext context) { var assembly = Assembly.GetExecutingAssembly(); var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList(); foreach (var type in types) { - context.AddBackgroundWorkerAsync(type); + await context.AddBackgroundWorkerAsync(type); } //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); - dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); - dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + await dbContext.InitAmmeterCacheData(); + await dbContext.InitWatermeterCacheData(); //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); @@ -70,7 +75,7 @@ public class CollectBusApplicationModule : AbpModule foreach (var item in topics) { - kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor)); + await kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor)); } } diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index 1726f39..a2cc613 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -24,8 +24,11 @@ + + + diff --git a/src/JiShe.CollectBus.Application/Samples/TestAppService.cs b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs new file mode 100644 index 0000000..9c53c3a --- /dev/null +++ b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Apache.IoTDB.DataStructure; +using Apache.IoTDB; +using Confluent.Kafka; +using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.FreeSql; +using JiShe.CollectBus.IoTDBProvider; +using JiShe.CollectBus.IotSystems.PrepayModel; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Options; +using JiShe.CollectBus.IoTDBProvider.Context; +using Microsoft.Extensions.Logging; +using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.IotSystems.AFNEntity; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using JiShe.CollectBus.Cassandra; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.IotSystems.MessageIssueds; +using Volo.Abp.Application.Services; + +namespace JiShe.CollectBus.Samples; + +[AllowAnonymous] +public class TestAppService : CollectBusAppService, IApplicationService +{ + private readonly ILogger _logger; + private readonly ICassandraRepository _messageIssuedCassandraRepository; + public TestAppService( + ILogger logger, + ICassandraRepository messageIssuedCassandraRepository + ) + { + _logger = logger; + _messageIssuedCassandraRepository = messageIssuedCassandraRepository; + } + public async Task AddMessage() + { + await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued + { + ClientId = Guid.NewGuid().ToString(), + Message = Array.Empty(), + DeviceNo = "123321312", + MessageId = Guid.NewGuid().ToString(), + Type = IssuedEventType.Data + }); + } +} diff --git a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index 05fd90d..e994530 100644 --- a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -27,7 +27,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(CreateToBeIssueTaskWorker); - CronExpression = $"{10}/* * * * *"; + CronExpression = "* 10 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs index f1bf5a1..441b22a 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs @@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFifteenMinuteWorker); - CronExpression = $"*/{15} * * * *"; + CronExpression = "* 15 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs index 2e491d6..0a61c63 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs @@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFiveMinuteWorker); - CronExpression = $"*/{5} * * * *"; + CronExpression = "* 5 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs index 82b979b..8b7cbfd 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs @@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberOneMinuteWorker); - CronExpression = $"*/{1} * * * *"; + CronExpression = "* 1 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs deleted file mode 100644 index 67252e4..0000000 --- a/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs +++ /dev/null @@ -1,87 +0,0 @@ -using Cassandra.Mapping; -using Cassandra; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Cassandra -{ - /// - /// Cassandra数据库的基础仓储类 - /// 提供了通用的CRUD操作方法,所有具体的仓储类都应该继承此类 - /// - /// 实体类型 - public abstract class BaseCassandraRepository where T : class - { - /// - /// Cassandra数据库会话 - /// 用于执行CQL查询和操作 - /// - protected readonly ISession Session; - - /// - /// Cassandra映射器 - /// 用于对象关系映射(ORM),简化实体与数据库表之间的转换 - /// - protected readonly IMapper Mapper; - - /// - /// 构造函数 - /// 初始化数据库会话和映射器 - /// - /// Cassandra连接工厂 - protected BaseCassandraRepository(ICassandraService cassandraService) - { - Session = cassandraService.GetSession(); - Mapper = new Mapper(Session); - } - - /// - /// 根据ID获取单个实体 - /// - /// 实体ID - /// 返回找到的实体,如果未找到则返回null - public async Task GetByIdAsync(string id) - { - return await Mapper.SingleOrDefaultAsync($"WHERE id = ?", id); - } - - /// - /// 获取所有实体 - /// - /// 返回实体集合 - public async Task> GetAllAsync() - { - return await Mapper.FetchAsync(); - } - - /// - /// 插入新实体 - /// - /// 要插入的实体 - public async Task InsertAsync(T entity) - { - await Mapper.InsertAsync(entity); - } - - /// - /// 更新现有实体 - /// - /// 要更新的实体 - public async Task UpdateAsync(T entity) - { - await Mapper.UpdateAsync(entity); - } - - /// - /// 根据ID删除实体 - /// - /// 要删除的实体ID - public async Task DeleteAsync(string id) - { - await Mapper.DeleteAsync($"WHERE id = ?", id); - } - } -} diff --git a/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs b/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs new file mode 100644 index 0000000..c14171e --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraConfig + { + public Node[] Nodes { get; set; } + public string Username { get; set; } + public string Password { get; set; } + public string Keyspace { get; set; } + public string ConsistencyLevel { get; set; } + public Pooling PoolingOptions { get; set; } + public Socket SocketOptions { get; set; } + public Query QueryOptions { get; set; } + + public ReplicationStrategy ReplicationStrategy { get; set; } + } + + public class Pooling + { + public int CoreConnectionsPerHost { get; set; } + public int MaxConnectionsPerHost { get; set; } + public int MaxRequestsPerConnection { get; set; } + } + + public class Socket + { + public int ConnectTimeoutMillis { get; set; } + public int ReadTimeoutMillis { get; set; } + } + + public class Query + { + public string ConsistencyLevel { get; set; } + public string SerialConsistencyLevel { get; set; } + public bool DefaultIdempotence { get; set; } + } + + public class ReplicationStrategy + { + public string Class { get; set; } + public DataCenter[] DataCenters { get; set; } + } + + public class DataCenter + { + public string Name { get; set; } + public int ReplicationFactor { get; set; } + public string Strategy { get; set; } + } + + public class Node + { + public string Host { get; set; } + public int Port { get; set; } + public string DataCenter { get; set; } + public string Rack { get; set; } + } + +} diff --git a/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs b/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs deleted file mode 100644 index 8fea3c1..0000000 --- a/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs +++ /dev/null @@ -1,45 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Cassandra -{ - /// - /// Cassandra数据库配置类 - /// 用于存储和管理Cassandra数据库的连接配置信息 - /// - public class CassandraConfiguration - { - /// - /// Cassandra集群的节点地址列表 - /// 可以配置多个节点地址,用于实现高可用和负载均衡 - /// - public string[] ContactPoints { get; set; } - - /// - /// Cassandra的键空间名称 - /// 键空间是Cassandra中数据组织的最高级别容器 - /// - public string Keyspace { get; set; } - - /// - /// Cassandra数据库的用户名 - /// 用于身份验证 - /// - public string Username { get; set; } - - /// - /// Cassandra数据库的密码 - /// 用于身份验证 - /// - public string Password { get; set; } - - /// - /// Cassandra数据库的端口号 - /// 默认值为9042,这是Cassandra的默认CQL端口 - /// - public int Port { get; set; } = 9042; - } -} diff --git a/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs new file mode 100644 index 0000000..a5cd9c8 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs @@ -0,0 +1,147 @@ +using System; +using System.Linq; +using System.Reflection; +using System.Text; +using Cassandra; +using Cassandra.Mapping; +using Cassandra.Data.Linq; +using System.ComponentModel.DataAnnotations; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; +using JiShe.CollectBus.Common.Attributes; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency + { + private readonly ILogger _logger; + + public Cluster Instance { get; set; } + + public ISession Session { get; set; } + + public CassandraConfig CassandraConfig { get; set; } + /// + /// + /// + /// + /// + public CassandraProvider( + IOptions options, + ILogger logger) + { + CassandraConfig = options.Value; + _logger = logger; + } + + public void InitClusterAndSession() + { + GetCluster((keyspace) => + { + GetSession(keyspace); + }); + } + + public Cluster GetCluster(Action? callback=null) + { + var clusterBuilder = Cluster.Builder(); + + // 添加多个节点 + foreach (var node in CassandraConfig.Nodes) + { + clusterBuilder.AddContactPoint(node.Host) + .WithPort(node.Port); + } + + clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password); + + // 优化连接池配置 + var poolingOptions = new PoolingOptions() + .SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost) + .SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost) + .SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection) + .SetHeartBeatInterval(30000); // 30秒心跳 + + clusterBuilder.WithPoolingOptions(poolingOptions); + + // 优化Socket配置 + var socketOptions = new SocketOptions() + .SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis) + .SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis) + .SetTcpNoDelay(true) // 启用Nagle算法 + .SetKeepAlive(true) // 启用TCP保活 + .SetReceiveBufferSize(32768) // 32KB接收缓冲区 + .SetSendBufferSize(32768); // 32KB发送缓冲区 + + clusterBuilder.WithSocketOptions(socketOptions); + + // 优化查询选项 + var queryOptions = new QueryOptions() + .SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel)) + .SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel)) + .SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence) + .SetPageSize(5000); // 增加页面大小 + + clusterBuilder.WithQueryOptions(queryOptions); + + // 启用压缩 + clusterBuilder.WithCompression(CompressionType.LZ4); + + // 配置重连策略 + clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000)); + Instance = clusterBuilder.Build(); + callback?.Invoke(null); + return Instance; + } + + public ISession GetSession(string? keyspace = null) + { + if (string.IsNullOrEmpty(keyspace)) + { + keyspace = CassandraConfig.Keyspace; + } + Session = Instance.Connect(); + var replication = GetReplicationStrategy(); + Session.CreateKeyspaceIfNotExists(keyspace, replication); + Session.ChangeKeyspace(keyspace); + return Session; + } + + private Dictionary GetReplicationStrategy() + { + var strategy = CassandraConfig.ReplicationStrategy.Class; + var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters; + + switch (strategy) + { + case "NetworkTopologyStrategy": + var networkDic = new Dictionary { { "class", "NetworkTopologyStrategy" } }; + foreach (var dataCenter in dataCenters) + { + networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString()); + } + return networkDic; + case "SimpleStrategy": + var dic = new Dictionary { { "class", "SimpleStrategy" } }; + if (dataCenters.Length >= 1) + { + dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString()); + } + else + { + _logger.LogError("SimpleStrategy 不支持多个数据中心!"); + } + return dic; + default: + throw new ArgumentNullException($"Strategy", "Strategy配置错误!"); + } + } + + public void Dispose() + { + Instance.Dispose(); + Session.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs b/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs new file mode 100644 index 0000000..0ea1b56 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs @@ -0,0 +1,156 @@ +using System.Collections.Concurrent; +using Cassandra; +using Cassandra.Mapping; +using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.Logging; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraQueryOptimizer + { + private readonly ISession _session; + private readonly ILogger _logger; + private readonly IMemoryCache _cache; + private readonly ConcurrentDictionary _preparedStatements; + private readonly int _batchSize; + private readonly TimeSpan _cacheExpiration; + + public CassandraQueryOptimizer( + ISession session, + ILogger logger, + IMemoryCache cache, + int batchSize = 100, + TimeSpan? cacheExpiration = null) + { + _session = session; + _logger = logger; + _cache = cache; + _preparedStatements = new ConcurrentDictionary(); + _batchSize = batchSize; + _cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5); + } + + public async Task GetOrPrepareStatementAsync(string cql) + { + return _preparedStatements.GetOrAdd(cql, key => + { + try + { + var statement = _session.Prepare(key); + _logger.LogDebug($"Prepared statement for CQL: {key}"); + return statement; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Failed to prepare statement for CQL: {key}"); + throw; + } + }); + } + + public async Task ExecuteBatchAsync(IEnumerable statements) + { + var batch = new BatchStatement(); + var count = 0; + + foreach (var statement in statements) + { + batch.Add(statement); + count++; + + if (count >= _batchSize) + { + await ExecuteBatchAsync(batch); + batch = new BatchStatement(); + count = 0; + } + } + + if (count > 0) + { + await ExecuteBatchAsync(batch); + } + } + + private async Task ExecuteBatchAsync(BatchStatement batch) + { + try + { + await _session.ExecuteAsync(batch); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to execute batch statement"); + throw; + } + } + + public async Task GetOrSetFromCacheAsync(string cacheKey, Func> getData) + { + if (_cache.TryGetValue(cacheKey, out T cachedValue)) + { + _logger.LogDebug($"Cache hit for key: {cacheKey}"); + return cachedValue; + } + + var data = await getData(); + _cache.Set(cacheKey, data, _cacheExpiration); + _logger.LogDebug($"Cache miss for key: {cacheKey}, data cached"); + return data; + } + + public async Task> ExecutePagedQueryAsync( + string cql, + object[] parameters, + int pageSize = 100, + string pagingState = null) where T : class + { + var statement = await GetOrPrepareStatementAsync(cql); + var boundStatement = statement.Bind(parameters); + + if (!string.IsNullOrEmpty(pagingState)) + { + boundStatement.SetPagingState(Convert.FromBase64String(pagingState)); + } + + boundStatement.SetPageSize(pageSize); + + try + { + var result = await _session.ExecuteAsync(boundStatement); + //TODO: RETURN OBJECT + throw new NotImplementedException(); + //result.GetRows() + //return result.Select(row => row); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Failed to execute paged query: {cql}"); + throw; + } + } + + public async Task BulkInsertAsync(IEnumerable items, string tableName) + { + var mapper = new Mapper(_session); + var batch = new List(); + var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})"; + + foreach (var chunk in items.Chunk(_batchSize)) + { + var statements = chunk.Select(item => + { + var props = typeof(T).GetProperties(); + var columns = string.Join(", ", props.Select(p => p.Name)); + var values = string.Join(", ", props.Select(p => "?")); + var statement = _session.Prepare(string.Format(cql, columns, values)); + return statement.Bind(props.Select(p => p.GetValue(item)).ToArray()); + }); + + batch.AddRange(statements); + } + + await ExecuteBatchAsync(batch); + } + } +} \ No newline at end of file diff --git a/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs new file mode 100644 index 0000000..8381450 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs @@ -0,0 +1,69 @@ +using Cassandra; +using Cassandra.Mapping; +using JiShe.CollectBus.Cassandra.Extensions; +using Volo.Abp.Domain.Entities; +using Volo.Abp.Domain.Repositories; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraRepository + : ICassandraRepository + where TEntity : class + { + + public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig) + { + Mapper = new Mapper(cassandraProvider.Session, mappingConfig); + cassandraProvider.Session.CreateTable(cassandraProvider.CassandraConfig.Keyspace); + } + + public readonly IMapper Mapper; + + public virtual async Task GetAsync(TKey id) + { + return await Mapper.SingleOrDefaultAsync("WHERE id = ?", id); + } + + public virtual async Task> GetListAsync() + { + return (await Mapper.FetchAsync()).ToList(); + } + + public virtual async Task InsertAsync(TEntity entity) + { + await Mapper.InsertAsync(entity); + return entity; + } + + public virtual async Task UpdateAsync(TEntity entity) + { + await Mapper.UpdateAsync(entity); + return entity; + } + + public virtual async Task DeleteAsync(TEntity entity) + { + await Mapper.DeleteAsync(entity); + } + + public virtual async Task DeleteAsync(TKey id) + { + await Mapper.DeleteAsync("WHERE id = ?", id); + } + + public virtual async Task> GetPagedListAsync( + int skipCount, + int maxResultCount, + string sorting) + { + var cql = $"SELECT * FROM {typeof(TEntity).Name.ToLower()}"; + if (!string.IsNullOrWhiteSpace(sorting)) + { + cql += $" ORDER BY {sorting}"; + } + cql += $" LIMIT {maxResultCount} OFFSET {skipCount}"; + + return (await Mapper.FetchAsync(cql)).ToList(); + } + } +} \ No newline at end of file diff --git a/src/JiShe.CollectBus.Cassandra/CassandraService.cs b/src/JiShe.CollectBus.Cassandra/CassandraService.cs deleted file mode 100644 index 8836235..0000000 --- a/src/JiShe.CollectBus.Cassandra/CassandraService.cs +++ /dev/null @@ -1,40 +0,0 @@ -using System.Diagnostics.Metrics; -using Cassandra; -using Microsoft.Extensions.Options; -using Volo.Abp.DependencyInjection; - -namespace JiShe.CollectBus.Cassandra -{ - public class CassandraService : ICassandraService, ISingletonDependency - { - private readonly CassandraConfiguration _configuration; - public ISession Instance { get; set; } = default; - - /// - /// CassandraService - /// - /// - public CassandraService(IOptions configuration) - { - _configuration = configuration.Value; - GetSession(); - } - - /// - /// 获取Cassandra数据库会话 - /// - /// - public ISession GetSession() - { - var cluster = Cluster.Builder() - .AddContactPoints(_configuration.ContactPoints) - .WithPort(_configuration.Port) - .WithCredentials(_configuration.Username, _configuration.Password) - .Build(); - - Instance = cluster.Connect(_configuration.Keyspace); - - return Instance; - } - } -} diff --git a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs index e7c2c69..2502420 100644 --- a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs +++ b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs @@ -1,13 +1,29 @@ -using Microsoft.Extensions.DependencyInjection; +using Cassandra; +using Cassandra.Mapping; +using JiShe.CollectBus.Cassandra.Mappers; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp; +using Volo.Abp.Autofac; using Volo.Abp.Modularity; namespace JiShe.CollectBus.Cassandra { + [DependsOn( + typeof(AbpAutofacModule) + )] public class CollectBusCassandraModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { - context.Services.Configure(context.Services.GetConfiguration().GetSection("Cassandra")); + Configure(context.Services.GetConfiguration().GetSection("Cassandra")); + + context.AddCassandra(); + + } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + context.UseCassandra(); } } } diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..9c6a044 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs @@ -0,0 +1,35 @@ +using Autofac.Core; +using Cassandra; +using Cassandra.Mapping; +using JiShe.CollectBus.Cassandra; +using JiShe.CollectBus.Cassandra.Mappers; +using Microsoft.Extensions.Options; +using System.Reflection; +using Volo.Abp; +using Volo.Abp.Modularity; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class ApplicationInitializationContextExtensions + { + public static void UseCassandra(this ApplicationInitializationContext context) + { + var service = context.ServiceProvider; + var cassandraProvider = service.GetRequiredService(); + cassandraProvider.InitClusterAndSession(); + } + } + + public static class ServiceCollectionExtensions + { + public static void AddCassandra(this ServiceConfigurationContext context) + { + context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); + + var mappingConfig = new MappingConfiguration() + .Define(new CollectBusMapping()); + context.Services.AddSingleton(mappingConfig); + } + } +} diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs new file mode 100644 index 0000000..f515416 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs @@ -0,0 +1,83 @@ +using System.Reflection; +using System.Text; +using Cassandra; +using System.ComponentModel.DataAnnotations; +using JiShe.CollectBus.Common.Attributes; +using Cassandra.Mapping; + +namespace JiShe.CollectBus.Cassandra.Extensions +{ + public static class SessionExtension + { + public static void CreateTable(this ISession session,string? defaultKeyspace=null) where TEntity : class + { + var type = typeof(TEntity); + var tableAttribute = type.GetCustomAttribute(); + var tableName = tableAttribute?.Name ?? type.Name.ToLower(); + var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace; + + var properties = type.GetProperties(); + var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null); + + if (primaryKey == null) + { + throw new InvalidOperationException($"No primary key defined for type {type.Name}"); + } + + var cql = new StringBuilder(); + cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} ("); + + foreach (var prop in properties) + { + var ignoreAttribute = prop.GetCustomAttribute(); + if (ignoreAttribute != null) continue; + var columnName = prop.Name.ToLower(); + var cqlType = GetCassandraType(prop.PropertyType); + + cql.Append($"{columnName} {cqlType}, "); + } + cql.Length -= 2; // Remove last comma and space + cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))"); + + session.Execute(cql.ToString()); + } + + private static string GetCassandraType(Type type) + { + // 基础类型处理 + switch (Type.GetTypeCode(type)) + { + case TypeCode.String: return "text"; + case TypeCode.Int32: return "int"; + case TypeCode.Int64: return "bigint"; + case TypeCode.Boolean: return "boolean"; + case TypeCode.DateTime: return "timestamp"; + case TypeCode.Byte: return "tinyint"; + } + + if (type == typeof(Guid)) return "uuid"; + if (type == typeof(DateTimeOffset)) return "timestamp"; + if (type == typeof(Byte[])) return "blob"; + + // 处理集合类型 + if (type.IsGenericType) + { + var genericType = type.GetGenericTypeDefinition(); + var elementType = type.GetGenericArguments()[0]; + + if (genericType == typeof(List<>)) + return $"list<{GetCassandraType(elementType)}>"; + if (genericType == typeof(HashSet<>)) + return $"set<{GetCassandraType(elementType)}>"; + if (genericType == typeof(Dictionary<,>)) + { + var keyType = type.GetGenericArguments()[0]; + var valueType = type.GetGenericArguments()[1]; + return $"map<{GetCassandraType(keyType)}, {GetCassandraType(valueType)}>"; + } + } + + throw new NotSupportedException($"不支持的类型: {type.Name}"); + } + } +} diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs new file mode 100644 index 0000000..45c6ca6 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs @@ -0,0 +1,24 @@ +using Cassandra; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Cassandra +{ + public interface ICassandraProvider + { + Cluster Instance { get;} + + ISession Session { get;} + + CassandraConfig CassandraConfig { get; } + + ISession GetSession(string? keyspace = null); + + Cluster GetCluster(Action? callback = null); + + void InitClusterAndSession(); + } +} diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs new file mode 100644 index 0000000..6dd474f --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; + +namespace JiShe.CollectBus.Cassandra +{ + public interface ICassandraRepository where TEntity : class + { + Task GetAsync(TKey id); + Task> GetListAsync(); + Task InsertAsync(TEntity entity); + Task UpdateAsync(TEntity entity); + Task DeleteAsync(TEntity entity); + Task DeleteAsync(TKey id); + Task> GetPagedListAsync(int skipCount, int maxResultCount, string sorting); + } +} diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraService.cs b/src/JiShe.CollectBus.Cassandra/ICassandraService.cs deleted file mode 100644 index 70eb5d6..0000000 --- a/src/JiShe.CollectBus.Cassandra/ICassandraService.cs +++ /dev/null @@ -1,9 +0,0 @@ -using Cassandra; - -namespace JiShe.CollectBus.Cassandra -{ - public interface ICassandraService - { - ISession GetSession(); - } -} diff --git a/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj index 5a8b22d..db7ccf3 100644 --- a/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj +++ b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj @@ -9,11 +9,14 @@ + + - + + diff --git a/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs b/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs new file mode 100644 index 0000000..d07f8ea --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs @@ -0,0 +1,20 @@ +using Cassandra.Mapping; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.IotSystems.MessageIssueds; +using static Cassandra.QueryTrace; + +namespace JiShe.CollectBus.Cassandra.Mappers +{ + public class CollectBusMapping: Mappings + { + public CollectBusMapping() + { + For() + .Column(e => e.Type, cm => cm.WithName("type").WithDbType()); + } + } +} diff --git a/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs new file mode 100644 index 0000000..bdd16d5 --- /dev/null +++ b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.EventBus; +using Volo.Abp; + +namespace JiShe.CollectBus.Common.Attributes +{ + [AttributeUsage(AttributeTargets.Class, Inherited = false)] + public class CassandraTableAttribute : Attribute + { + public CassandraTableAttribute(string? name = null,string? keyspace =null) + { + Name = name; + Keyspace = keyspace; + } + + public virtual string? Name { get; } + + public virtual string? Keyspace { get; } + } + + [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] + public class CassandraIgnoreAttribute : Attribute + { + public CassandraIgnoreAttribute() + { + } + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs index 42d740c..5977d5e 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs @@ -1,14 +1,18 @@ using System; using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; using System.Linq; using System.Text; using System.Threading.Tasks; +using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Enums; namespace JiShe.CollectBus.IotSystems.MessageIssueds { + [CassandraTable] public class MessageIssued { + [Key] public string ClientId { get; set; } public byte[] Message { get; set; } public string DeviceNo { get; set; } diff --git a/src/JiShe.CollectBus.Host/Program.cs b/src/JiShe.CollectBus.Host/Program.cs index 40a13cc..d75a227 100644 --- a/src/JiShe.CollectBus.Host/Program.cs +++ b/src/JiShe.CollectBus.Host/Program.cs @@ -1,12 +1,35 @@ using JiShe.CollectBus.Host; using Microsoft.AspNetCore.Hosting; using Serilog; +using Volo.Abp.Modularity.PlugIns; public class Program { - public static void Main(string[] args) + /// + /// + /// + /// + /// + public static async Task Main(string[] args) { - CreateHostBuilder(args).Build().Run(); + var builder = WebApplication.CreateBuilder(args); + builder.Host.UseContentRoot(Directory.GetCurrentDirectory()) + .UseSerilog((context, loggerConfiguration) => + { + loggerConfiguration.ReadFrom.Configuration(context.Configuration); + }) + .UseAutofac(); + await builder.AddApplicationAsync(options => + { + // ز̶ģʽȲ + options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins")); + }); + var app = builder.Build(); + await app.InitializeApplicationAsync(); + await app.RunAsync(); + + + //await CreateHostBuilder(args).Build().RunAsync(); } private static IHostBuilder CreateHostBuilder(string[] args) => @@ -16,13 +39,14 @@ public class Program { loggerConfiguration.ReadFrom.Configuration(context.Configuration); }) - .UseAutofac() .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup(); - }); - - + }) + .UseAutofac(); + + + private static IHostBuilder CreateConsoleHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) @@ -34,8 +58,8 @@ public class Program }); - private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) + private static async Task ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) { - services.AddApplication(); + await services.AddApplicationAsync(); } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.Host/Startup.cs b/src/JiShe.CollectBus.Host/Startup.cs index dd6453e..32740ee 100644 --- a/src/JiShe.CollectBus.Host/Startup.cs +++ b/src/JiShe.CollectBus.Host/Startup.cs @@ -39,10 +39,15 @@ namespace JiShe.CollectBus.Host /// The lifetime. public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime) { - app.InitializeApplication(); - //await app.InitializeApplicationAsync(); + app.Use(async (context, next) => + { + // 在请求处理之前调用 InitializeApplicationAsync + await app.InitializeApplicationAsync(); + // 继续请求管道中的下一个中间件 + await next(); + }); } } } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index a5b2e15..a837636 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -130,5 +130,48 @@ }, "ServerTagName": "JiSheCollectBus", "KafkaReplicationFactor": 3, - "NumPartitions": 30 + "NumPartitions": 30, + "Cassandra": { + "ReplicationStrategy": { + "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下 + "DataCenters": [ + { + "Name": "dc1", + "ReplicationFactor": 3 + } + ] + }, + "Nodes": [ + { + "Host": "192.168.1.9", + "Port": 9042, + "DataCenter": "dc1", + "Rack": "RAC1" + }, + { + "Host": "192.168.1.9", + "Port": 9043, + "DataCenter": "dc1", + "Rack": "RAC2" + } + ], + "Username": "admin", + "Password": "lixiao1980", + "Keyspace": "jishecollectbus", + "ConsistencyLevel": "Quorum", + "PoolingOptions": { + "CoreConnectionsPerHost": 4, + "MaxConnectionsPerHost": 8, + "MaxRequestsPerConnection": 2000 + }, + "SocketOptions": { + "ConnectTimeoutMillis": 10000, + "ReadTimeoutMillis": 20000 + }, + "QueryOptions": { + "ConsistencyLevel": "Quorum", + "SerialConsistencyLevel": "Serial", + "DefaultIdempotence": true + } + } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 64661d1..5cda0c8 100644 --- a/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -12,10 +12,10 @@ namespace JiShe.CollectBus.Protocol context.Services.AddKeyedSingleton(nameof(StandardProtocolPlugin)); } - public override void OnApplicationInitialization(ApplicationInitializationContext context) + public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin)); - standardProtocol.AddAsync(); + await standardProtocol.AddAsync(); } } }