using System; using System.Linq; using System.Reflection; using System.Text; using Cassandra; using Cassandra.Mapping; using Cassandra.Data.Linq; using System.ComponentModel.DataAnnotations; using System.Diagnostics; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Common.Attributes; namespace JiShe.CollectBus.Cassandra { public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency { private readonly ILogger _logger; public Cluster Instance { get; set; } public ISession Session { get; set; } public CassandraConfig CassandraConfig { get; set; } /// /// /// /// /// public CassandraProvider( IOptions options, ILogger logger) { CassandraConfig = options.Value; _logger = logger; } public Task InitClusterAndSessionAsync() { InitClusterAndSession(); return Task.CompletedTask; } public void InitClusterAndSession() { GetCluster((keyspace) => { GetSession(keyspace); }); } public Cluster GetCluster(Action? callback=null) { var clusterBuilder = Cluster.Builder(); // 添加多个节点 foreach (var node in CassandraConfig.Nodes) { clusterBuilder.AddContactPoint(node.Host) .WithPort(node.Port); } clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password); // 优化连接池配置 var poolingOptions = new PoolingOptions() .SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost) .SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost) .SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection) .SetHeartBeatInterval(30000); // 30秒心跳 clusterBuilder.WithPoolingOptions(poolingOptions); // 优化Socket配置 var socketOptions = new SocketOptions() .SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis) .SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis) .SetTcpNoDelay(true) // 启用Nagle算法 .SetKeepAlive(true) // 启用TCP保活 .SetReceiveBufferSize(32768) // 32KB接收缓冲区 .SetSendBufferSize(32768); // 32KB发送缓冲区 clusterBuilder.WithSocketOptions(socketOptions); // 优化查询选项 var queryOptions = new QueryOptions() .SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel)) .SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel)) .SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence) .SetPageSize(5000); // 增加页面大小 clusterBuilder.WithQueryOptions(queryOptions); // 启用压缩 clusterBuilder.WithCompression(CompressionType.LZ4); // 配置重连策略 clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000)); Instance = clusterBuilder.Build(); callback?.Invoke(null); return Instance; } public ISession GetSession(string? keyspace = null) { if (string.IsNullOrEmpty(keyspace)) { keyspace = CassandraConfig.Keyspace; } Session = Instance.Connect(); var replication = GetReplicationStrategy(); Session.CreateKeyspaceIfNotExists(keyspace, replication); Session.ChangeKeyspace(keyspace); return Session; } private Dictionary GetReplicationStrategy() { var strategy = CassandraConfig.ReplicationStrategy.Class; var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters; switch (strategy) { case "NetworkTopologyStrategy": var networkDic = new Dictionary { { "class", "NetworkTopologyStrategy" } }; foreach (var dataCenter in dataCenters) { networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString()); } return networkDic; case "SimpleStrategy": var dic = new Dictionary { { "class", "SimpleStrategy" } }; if (dataCenters.Length >= 1) { dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString()); } else { _logger.LogError("SimpleStrategy 不支持多个数据中心!"); } return dic; default: throw new ArgumentNullException($"Strategy", "Strategy配置错误!"); } } public void Dispose() { Instance.Dispose(); Session.Dispose(); } } }