2025-04-17 20:28:50 +08:00

154 lines
5.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Linq;
using System.Reflection;
using System.Text;
using Cassandra;
using Cassandra.Mapping;
using Cassandra.Data.Linq;
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Common.Attributes;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency
{
private readonly ILogger<CassandraProvider> _logger;
public Cluster Instance { get; set; }
public ISession Session { get; set; }
public CassandraConfig CassandraConfig { get; set; }
/// <summary>
///
/// </summary>
/// <param name="options"></param>
/// <param name="logger"></param>
public CassandraProvider(
IOptions<CassandraConfig> options,
ILogger<CassandraProvider> logger)
{
CassandraConfig = options.Value;
_logger = logger;
}
public Task InitClusterAndSessionAsync()
{
InitClusterAndSession();
return Task.CompletedTask;
}
public void InitClusterAndSession()
{
GetCluster((keyspace) =>
{
GetSession(keyspace);
});
}
public Cluster GetCluster(Action<string?>? callback=null)
{
var clusterBuilder = Cluster.Builder();
// 添加多个节点
foreach (var node in CassandraConfig.Nodes)
{
clusterBuilder.AddContactPoint(node.Host)
.WithPort(node.Port);
}
clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password);
// 优化连接池配置
var poolingOptions = new PoolingOptions()
.SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost)
.SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost)
.SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection)
.SetHeartBeatInterval(30000); // 30秒心跳
clusterBuilder.WithPoolingOptions(poolingOptions);
// 优化Socket配置
var socketOptions = new SocketOptions()
.SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis)
.SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis)
.SetTcpNoDelay(true) // 启用Nagle算法
.SetKeepAlive(true) // 启用TCP保活
.SetReceiveBufferSize(32768) // 32KB接收缓冲区
.SetSendBufferSize(32768); // 32KB发送缓冲区
clusterBuilder.WithSocketOptions(socketOptions);
// 优化查询选项
var queryOptions = new QueryOptions()
.SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel))
.SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel))
.SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence)
.SetPageSize(5000); // 增加页面大小
clusterBuilder.WithQueryOptions(queryOptions);
// 启用压缩
clusterBuilder.WithCompression(CompressionType.LZ4);
// 配置重连策略
clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000));
Instance = clusterBuilder.Build();
callback?.Invoke(null);
return Instance;
}
public ISession GetSession(string? keyspace = null)
{
if (string.IsNullOrEmpty(keyspace))
{
keyspace = CassandraConfig.Keyspace;
}
Session = Instance.Connect();
var replication = GetReplicationStrategy();
Session.CreateKeyspaceIfNotExists(keyspace, replication);
Session.ChangeKeyspace(keyspace);
return Session;
}
private Dictionary<string, string> GetReplicationStrategy()
{
var strategy = CassandraConfig.ReplicationStrategy.Class;
var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters;
switch (strategy)
{
case "NetworkTopologyStrategy":
var networkDic = new Dictionary<string, string> { { "class", "NetworkTopologyStrategy" } };
foreach (var dataCenter in dataCenters)
{
networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString());
}
return networkDic;
case "SimpleStrategy":
var dic = new Dictionary<string, string> { { "class", "SimpleStrategy" } };
if (dataCenters.Length >= 1)
{
dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString());
}
else
{
_logger.LogError("SimpleStrategy 不支持多个数据中心!");
}
return dic;
default:
throw new ArgumentNullException($"Strategy", "Strategy配置错误");
}
}
public void Dispose()
{
Instance.Dispose();
Session.Dispose();
}
}
}