154 lines
5.7 KiB
C#
Raw Normal View History

2025-04-15 17:57:47 +08:00
using System;
using System.Linq;
using System.Reflection;
using System.Text;
using Cassandra;
using Cassandra.Mapping;
using Cassandra.Data.Linq;
using System.ComponentModel.DataAnnotations;
2025-04-16 16:12:38 +08:00
using System.Diagnostics;
2025-04-15 17:57:47 +08:00
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Common.Attributes;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency
{
private readonly ILogger<CassandraProvider> _logger;
public Cluster Instance { get; set; }
public ISession Session { get; set; }
public CassandraConfig CassandraConfig { get; set; }
/// <summary>
///
/// </summary>
/// <param name="options"></param>
/// <param name="logger"></param>
public CassandraProvider(
IOptions<CassandraConfig> options,
ILogger<CassandraProvider> logger)
{
CassandraConfig = options.Value;
_logger = logger;
}
2025-04-16 16:12:38 +08:00
public Task InitClusterAndSessionAsync()
{
InitClusterAndSession();
return Task.CompletedTask;
}
2025-04-15 17:57:47 +08:00
public void InitClusterAndSession()
{
GetCluster((keyspace) =>
{
GetSession(keyspace);
});
}
public Cluster GetCluster(Action<string?>? callback=null)
{
var clusterBuilder = Cluster.Builder();
// 添加多个节点
foreach (var node in CassandraConfig.Nodes)
{
clusterBuilder.AddContactPoint(node.Host)
.WithPort(node.Port);
}
clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password);
// 优化连接池配置
var poolingOptions = new PoolingOptions()
.SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost)
.SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost)
.SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection)
.SetHeartBeatInterval(30000); // 30秒心跳
clusterBuilder.WithPoolingOptions(poolingOptions);
// 优化Socket配置
var socketOptions = new SocketOptions()
.SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis)
.SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis)
.SetTcpNoDelay(true) // 启用Nagle算法
.SetKeepAlive(true) // 启用TCP保活
.SetReceiveBufferSize(32768) // 32KB接收缓冲区
.SetSendBufferSize(32768); // 32KB发送缓冲区
clusterBuilder.WithSocketOptions(socketOptions);
// 优化查询选项
var queryOptions = new QueryOptions()
.SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel))
.SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel))
.SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence)
.SetPageSize(5000); // 增加页面大小
clusterBuilder.WithQueryOptions(queryOptions);
// 启用压缩
clusterBuilder.WithCompression(CompressionType.LZ4);
// 配置重连策略
clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000));
Instance = clusterBuilder.Build();
callback?.Invoke(null);
return Instance;
}
public ISession GetSession(string? keyspace = null)
{
if (string.IsNullOrEmpty(keyspace))
{
keyspace = CassandraConfig.Keyspace;
}
Session = Instance.Connect();
var replication = GetReplicationStrategy();
Session.CreateKeyspaceIfNotExists(keyspace, replication);
Session.ChangeKeyspace(keyspace);
return Session;
}
private Dictionary<string, string> GetReplicationStrategy()
{
var strategy = CassandraConfig.ReplicationStrategy.Class;
var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters;
switch (strategy)
{
case "NetworkTopologyStrategy":
var networkDic = new Dictionary<string, string> { { "class", "NetworkTopologyStrategy" } };
foreach (var dataCenter in dataCenters)
{
networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString());
}
return networkDic;
case "SimpleStrategy":
var dic = new Dictionary<string, string> { { "class", "SimpleStrategy" } };
if (dataCenters.Length >= 1)
{
dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString());
}
else
{
_logger.LogError("SimpleStrategy 不支持多个数据中心!");
}
return dic;
default:
throw new ArgumentNullException($"Strategy", "Strategy配置错误");
}
}
public void Dispose()
{
Instance.Dispose();
Session.Dispose();
}
}
}