using System.Collections.Concurrent; using Cassandra; using Cassandra.Mapping; using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Logging; namespace JiShe.CollectBus.Cassandra { public class CassandraQueryOptimizer { private readonly ISession _session; private readonly ILogger _logger; private readonly IMemoryCache _cache; private readonly ConcurrentDictionary _preparedStatements; private readonly int _batchSize; private readonly TimeSpan _cacheExpiration; public CassandraQueryOptimizer( ISession session, ILogger logger, IMemoryCache cache, int batchSize = 100, TimeSpan? cacheExpiration = null) { _session = session; _logger = logger; _cache = cache; _preparedStatements = new ConcurrentDictionary(); _batchSize = batchSize; _cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5); } public async Task GetOrPrepareStatementAsync(string cql) { return _preparedStatements.GetOrAdd(cql, key => { try { var statement = _session.Prepare(key); _logger.LogDebug($"Prepared statement for CQL: {key}"); return statement; } catch (Exception ex) { _logger.LogError(ex, $"Failed to prepare statement for CQL: {key}"); throw; } }); } public async Task ExecuteBatchAsync(IEnumerable statements) { var batch = new BatchStatement(); var count = 0; foreach (var statement in statements) { batch.Add(statement); count++; if (count >= _batchSize) { await ExecuteBatchAsync(batch); batch = new BatchStatement(); count = 0; } } if (count > 0) { await ExecuteBatchAsync(batch); } } private async Task ExecuteBatchAsync(BatchStatement batch) { try { await _session.ExecuteAsync(batch); } catch (Exception ex) { _logger.LogError(ex, "Failed to execute batch statement"); throw; } } public async Task GetOrSetFromCacheAsync(string cacheKey, Func> getData) { if (_cache.TryGetValue(cacheKey, out T cachedValue)) { _logger.LogDebug($"Cache hit for key: {cacheKey}"); return cachedValue; } var data = await getData(); _cache.Set(cacheKey, data, _cacheExpiration); _logger.LogDebug($"Cache miss for key: {cacheKey}, data cached"); return data; } public async Task> ExecutePagedQueryAsync( string cql, object[] parameters, int pageSize = 100, string pagingState = null) where T : class { var statement = await GetOrPrepareStatementAsync(cql); var boundStatement = statement.Bind(parameters); if (!string.IsNullOrEmpty(pagingState)) { boundStatement.SetPagingState(Convert.FromBase64String(pagingState)); } boundStatement.SetPageSize(pageSize); try { var result = await _session.ExecuteAsync(boundStatement); //TODO: RETURN OBJECT throw new NotImplementedException(); //result.GetRows() //return result.Select(row => row); } catch (Exception ex) { _logger.LogError(ex, $"Failed to execute paged query: {cql}"); throw; } } public async Task BulkInsertAsync(IEnumerable items, string tableName) { var mapper = new Mapper(_session); var batch = new List(); var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})"; foreach (var chunk in items.Chunk(_batchSize)) { var statements = chunk.Select(item => { var props = typeof(T).GetProperties(); var columns = string.Join(", ", props.Select(p => p.Name)); var values = string.Join(", ", props.Select(p => "?")); var statement = _session.Prepare(string.Format(cql, columns, values)); return statement.Bind(props.Select(p => p.GetValue(item)).ToArray()); }); batch.AddRange(statements); } await ExecuteBatchAsync(batch); } } }