JiShe.CollectBus/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs

156 lines
5.1 KiB
C#
Raw Normal View History

2025-04-15 17:57:47 +08:00
using System.Collections.Concurrent;
using Cassandra;
using Cassandra.Mapping;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraQueryOptimizer
{
private readonly ISession _session;
private readonly ILogger<CassandraQueryOptimizer> _logger;
private readonly IMemoryCache _cache;
private readonly ConcurrentDictionary<string, PreparedStatement> _preparedStatements;
private readonly int _batchSize;
private readonly TimeSpan _cacheExpiration;
public CassandraQueryOptimizer(
ISession session,
ILogger<CassandraQueryOptimizer> logger,
IMemoryCache cache,
int batchSize = 100,
TimeSpan? cacheExpiration = null)
{
_session = session;
_logger = logger;
_cache = cache;
_preparedStatements = new ConcurrentDictionary<string, PreparedStatement>();
_batchSize = batchSize;
_cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5);
}
public async Task<PreparedStatement> GetOrPrepareStatementAsync(string cql)
{
return _preparedStatements.GetOrAdd(cql, key =>
{
try
{
var statement = _session.Prepare(key);
_logger.LogDebug($"Prepared statement for CQL: {key}");
return statement;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to prepare statement for CQL: {key}");
throw;
}
});
}
public async Task ExecuteBatchAsync(IEnumerable<BoundStatement> statements)
{
var batch = new BatchStatement();
var count = 0;
foreach (var statement in statements)
{
batch.Add(statement);
count++;
if (count >= _batchSize)
{
await ExecuteBatchAsync(batch);
batch = new BatchStatement();
count = 0;
}
}
if (count > 0)
{
await ExecuteBatchAsync(batch);
}
}
private async Task ExecuteBatchAsync(BatchStatement batch)
{
try
{
await _session.ExecuteAsync(batch);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to execute batch statement");
throw;
}
}
public async Task<T> GetOrSetFromCacheAsync<T>(string cacheKey, Func<Task<T>> getData)
{
if (_cache.TryGetValue(cacheKey, out T cachedValue))
{
_logger.LogDebug($"Cache hit for key: {cacheKey}");
return cachedValue;
}
var data = await getData();
_cache.Set(cacheKey, data, _cacheExpiration);
_logger.LogDebug($"Cache miss for key: {cacheKey}, data cached");
return data;
}
public async Task<IEnumerable<T>> ExecutePagedQueryAsync<T>(
string cql,
object[] parameters,
int pageSize = 100,
string pagingState = null) where T : class
{
var statement = await GetOrPrepareStatementAsync(cql);
var boundStatement = statement.Bind(parameters);
if (!string.IsNullOrEmpty(pagingState))
{
boundStatement.SetPagingState(Convert.FromBase64String(pagingState));
}
boundStatement.SetPageSize(pageSize);
try
{
var result = await _session.ExecuteAsync(boundStatement);
//TODO: RETURN OBJECT
throw new NotImplementedException();
//result.GetRows()
//return result.Select(row => row);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to execute paged query: {cql}");
throw;
}
}
public async Task BulkInsertAsync<T>(IEnumerable<T> items, string tableName)
{
var mapper = new Mapper(_session);
var batch = new List<BoundStatement>();
var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})";
foreach (var chunk in items.Chunk(_batchSize))
{
var statements = chunk.Select(item =>
{
var props = typeof(T).GetProperties();
var columns = string.Join(", ", props.Select(p => p.Name));
var values = string.Join(", ", props.Select(p => "?"));
var statement = _session.Prepare(string.Format(cql, columns, values));
return statement.Bind(props.Select(p => p.GetValue(item)).ToArray());
});
batch.AddRange(statements);
}
await ExecuteBatchAsync(batch);
}
}
}