using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.IO.Compression; using System.Linq; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; namespace JiShe.CollectBus.PluginFileWatcher { /// /// 负责文件事件的存储、查询和回放 /// public class EventStorage : IDisposable { private readonly FileMonitorConfig _config; private readonly ILogger _logger; private readonly ConcurrentQueue _eventQueue; private readonly Timer _storageTimer; private readonly SemaphoreSlim _storageLock = new SemaphoreSlim(1, 1); private readonly string _storageDirectory; private readonly EventDatabaseManager _dbManager; private bool _disposed; /// /// 创建新的事件存储管理器实例 /// /// 文件监控配置 /// 日志记录器 public EventStorage(FileMonitorConfig config, ILogger logger) { _config = config ?? throw new ArgumentNullException(nameof(config)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _eventQueue = new ConcurrentQueue(); // 确保存储目录存在 _storageDirectory = !string.IsNullOrEmpty(_config.EventStorage.StorageDirectory) ? _config.EventStorage.StorageDirectory : Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "EventLogs"); if (!Directory.Exists(_storageDirectory)) { Directory.CreateDirectory(_storageDirectory); } // 创建数据库管理器(如果配置为SQLite存储类型) if (config.EventStorage.EnableEventStorage && config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase)) { _dbManager = new EventDatabaseManager(config, logger); _logger.LogInformation("已初始化SQLite事件存储"); } // 初始化存储定时器(如果启用) if (_config.EventStorage.EnableEventStorage) { var intervalMs = _config.EventStorage.StorageIntervalSeconds * 1000; _storageTimer = new Timer(SaveEventsTimerCallback, null, intervalMs, intervalMs); _logger.LogInformation($"事件存储已初始化,存储目录:{_storageDirectory},存储间隔:{_config.EventStorage.StorageIntervalSeconds}秒"); } } /// /// 记录一个文件事件 /// /// 文件事件 public void RecordEvent(FileEvent fileEvent) { if (fileEvent == null || !_config.EventStorage.EnableEventStorage) return; _eventQueue.Enqueue(fileEvent); _logger.LogDebug($"文件事件已加入队列:{fileEvent.EventType} - {fileEvent.FullPath}"); } /// /// 从FileSystemEventArgs记录事件 /// /// 文件系统事件参数 public void RecordEvent(FileSystemEventArgs e) { if (e == null || !_config.EventStorage.EnableEventStorage) return; var fileEvent = FileEvent.FromFileSystemEventArgs(e); RecordEvent(fileEvent); } /// /// 定时将事件保存到文件 /// private async void SaveEventsTimerCallback(object state) { if (_disposed || _eventQueue.IsEmpty) return; try { // 防止多个定时器回调同时执行 if (!await _storageLock.WaitAsync(0)) { return; } try { // 从队列中取出事件 List eventsToSave = new List(); int batchSize = _config.EventStorage.BatchSize; while (eventsToSave.Count < batchSize && !_eventQueue.IsEmpty) { if (_eventQueue.TryDequeue(out var fileEvent)) { eventsToSave.Add(fileEvent); } } if (eventsToSave.Count > 0) { await SaveEventsToFileAsync(eventsToSave); _logger.LogInformation($"已成功保存 {eventsToSave.Count} 个事件"); } // 如果有配置,清理旧日志文件 if (_config.EventStorage.MaxLogFiles > 0) { await CleanupOldLogFilesAsync(); } // 如果有配置,清理旧数据库记录 if (_dbManager != null && _config.EventStorage.DataRetentionDays > 0) { await _dbManager.CleanupOldDataAsync(_config.EventStorage.DataRetentionDays); } } finally { _storageLock.Release(); } } catch (Exception ex) { _logger.LogError(ex, "保存事件时发生错误"); } } /// /// 将事件保存到文件 /// /// 要保存的事件列表 private async Task SaveEventsToFileAsync(List events) { if (events == null || events.Count == 0) return; try { // 根据存储类型选择保存方式 if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null) { // 保存到SQLite数据库 await _dbManager.SaveEventsAsync(events); } else { // 保存到文件 string fileName = string.Format( _config.EventStorage.LogFileNameFormat, DateTime.Now); string filePath = Path.Combine(_storageDirectory, fileName); // 创建事件日志文件对象 var logFile = new EventLogFile { CreatedTime = DateTime.UtcNow, Events = events }; // 序列化为JSON string jsonContent = JsonSerializer.Serialize(logFile, new JsonSerializerOptions { WriteIndented = true }); // 是否启用压缩 if (_config.EventStorage.CompressLogFiles) { string gzFilePath = $"{filePath}.gz"; await CompressAndSaveStringAsync(jsonContent, gzFilePath); _logger.LogInformation($"已将事件保存到压缩文件:{gzFilePath}"); } else { await File.WriteAllTextAsync(filePath, jsonContent); _logger.LogInformation($"已将事件保存到文件:{filePath}"); } } } catch (Exception ex) { _logger.LogError(ex, "保存事件到文件时发生错误"); throw; } } /// /// 压缩并保存字符串到文件 /// /// 要保存的内容 /// 文件路径 private static async Task CompressAndSaveStringAsync(string content, string filePath) { using var fileStream = new FileStream(filePath, FileMode.Create); using var gzipStream = new GZipStream(fileStream, CompressionLevel.Optimal); using var writer = new StreamWriter(gzipStream); await writer.WriteAsync(content); } /// /// 清理过多的日志文件 /// private async Task CleanupOldLogFilesAsync() { try { // 检查是否需要清理 if (_config.EventStorage.MaxLogFiles <= 0) return; var directory = new DirectoryInfo(_storageDirectory); var logFiles = directory.GetFiles("*.*") .Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz")) .OrderByDescending(f => f.CreationTime) .ToArray(); // 如果文件数量超过最大值,删除最旧的文件 if (logFiles.Length > _config.EventStorage.MaxLogFiles) { int filesToDelete = logFiles.Length - _config.EventStorage.MaxLogFiles; var filesToRemove = logFiles.Skip(logFiles.Length - filesToDelete).ToArray(); foreach (var file in filesToRemove) { try { file.Delete(); _logger.LogInformation($"已删除旧的事件日志文件:{file.Name}"); } catch (Exception ex) { _logger.LogWarning(ex, $"删除旧日志文件失败:{file.FullName}"); } } } } catch (Exception ex) { _logger.LogError(ex, "清理旧日志文件时发生错误"); } await Task.CompletedTask; } /// /// 查询历史事件 /// /// 查询参数 /// 查询结果 public async Task QueryEventsAsync(EventQueryParams queryParams) { if (queryParams == null) throw new ArgumentNullException(nameof(queryParams)); // 如果是SQLite存储且数据库管理器可用,使用数据库查询 if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null) { return await _dbManager.QueryEventsAsync(queryParams); } var result = new EventQueryResult { StartTime = queryParams.StartTime ?? DateTime.MinValue, EndTime = queryParams.EndTime ?? DateTime.MaxValue }; try { await _storageLock.WaitAsync(); try { // 获取所有日志文件 var directory = new DirectoryInfo(_storageDirectory); if (!directory.Exists) { return result; } var logFiles = directory.GetFiles("*.*") .Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz")) .OrderByDescending(f => f.CreationTime) .ToArray(); List allEvents = new List(); // 加载所有日志文件中的事件 foreach (var file in logFiles) { try { var events = await LoadEventsFromFileAsync(file.FullName); if (events != null && events.Count > 0) { allEvents.AddRange(events); } } catch (Exception ex) { _logger.LogWarning(ex, $"从文件加载事件失败:{file.FullName}"); } } // 内存中队列的事件也包含在查询中 FileEvent[] queuedEvents = _eventQueue.ToArray(); allEvents.AddRange(queuedEvents); // 应用查询过滤条件 var filteredEvents = allEvents .Where(e => (queryParams.StartTime == null || e.Timestamp >= queryParams.StartTime) && (queryParams.EndTime == null || e.Timestamp <= queryParams.EndTime) && (queryParams.EventType == null || e.EventType == queryParams.EventType.Value) && (string.IsNullOrEmpty(queryParams.PathFilter) || e.FullPath.Contains(queryParams.PathFilter, StringComparison.OrdinalIgnoreCase)) && (string.IsNullOrEmpty(queryParams.ExtensionFilter) || e.Extension.Equals(queryParams.ExtensionFilter, StringComparison.OrdinalIgnoreCase))) .ToList(); // 应用排序 IEnumerable orderedEvents = queryParams.AscendingOrder ? filteredEvents.OrderBy(e => e.Timestamp) : filteredEvents.OrderByDescending(e => e.Timestamp); // 计算总数 result.TotalCount = filteredEvents.Count; // 应用分页 int skip = queryParams.PageIndex * queryParams.PageSize; int take = queryParams.PageSize; result.Events = orderedEvents.Skip(skip).Take(take).ToList(); result.HasMore = (skip + take) < result.TotalCount; return result; } finally { _storageLock.Release(); } } catch (Exception ex) { _logger.LogError(ex, "查询事件时发生错误"); throw; } } /// /// 从文件加载事件 /// /// 文件路径 /// 事件列表 private async Task> LoadEventsFromFileAsync(string filePath) { if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath)) { return new List(); } try { string jsonContent; // 处理压缩文件 if (filePath.EndsWith(".gz")) { using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read); using var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress); using var reader = new StreamReader(gzipStream); jsonContent = await reader.ReadToEndAsync(); } else { jsonContent = await File.ReadAllTextAsync(filePath); } var logFile = JsonSerializer.Deserialize(jsonContent); return logFile?.Events ?? new List(); } catch (Exception ex) { _logger.LogError(ex, $"从文件加载事件失败:{filePath}"); return new List(); } } /// /// 启动事件回放会话 /// /// 查询参数,定义要回放的事件 /// 回放处理回调 /// 取消标记 /// 回放会话控制器 public async Task StartReplayAsync( EventQueryParams queryParams, Func replayHandler, CancellationToken cancellationToken = default) { if (replayHandler == null) throw new ArgumentNullException(nameof(replayHandler)); // 查询要回放的事件 var queryResult = await QueryEventsAsync(queryParams); // 创建并启动回放会话 var session = new EventReplaySession( queryResult.Events, replayHandler, _config.EventStorage.ReplayIntervalMs, _config.EventStorage.ReplaySpeedFactor, _logger, cancellationToken); await session.StartAsync(); return session; } /// /// 释放资源 /// public void Dispose() { if (_disposed) return; _disposed = true; _storageTimer?.Dispose(); _storageLock?.Dispose(); _dbManager?.Dispose(); // 尝试保存剩余事件 if (_config.EventStorage.EnableEventStorage && !_eventQueue.IsEmpty) { var remainingEvents = new List(); while (!_eventQueue.IsEmpty && _eventQueue.TryDequeue(out var evt)) { remainingEvents.Add(evt); } if (remainingEvents.Count > 0) { SaveEventsToFileAsync(remainingEvents).GetAwaiter().GetResult(); } } GC.SuppressFinalize(this); } } /// /// 事件回放会话 /// public class EventReplaySession : IDisposable { private readonly List _events; private readonly Func _replayHandler; private readonly int _replayIntervalMs; private readonly double _speedFactor; private readonly ILogger _logger; private readonly CancellationToken _cancellationToken; private CancellationTokenSource _linkedCts; private Task _replayTask; private bool _disposed; private bool _isPaused; private readonly SemaphoreSlim _pauseSemaphore = new SemaphoreSlim(1, 1); /// /// 回放进度(0-100) /// public int Progress { get; private set; } /// /// 当前回放的事件索引 /// public int CurrentIndex { get; private set; } /// /// 事件总数 /// public int TotalEvents => _events?.Count ?? 0; /// /// 回放是否已完成 /// public bool IsCompleted { get; private set; } /// /// 回放是否已暂停 /// public bool IsPaused => _isPaused; /// /// 回放已处理的事件数 /// public int ProcessedEvents { get; private set; } /// /// 回放开始时间 /// public DateTime StartTime { get; private set; } /// /// 回放结束时间(如果已完成) /// public DateTime? EndTime { get; private set; } /// /// 创建新的事件回放会话 /// /// 要回放的事件 /// 回放处理回调 /// 回放间隔(毫秒) /// 速度因子 /// 日志记录器 /// 取消标记 public EventReplaySession( List events, Func replayHandler, int replayIntervalMs, double speedFactor, ILogger logger, CancellationToken cancellationToken = default) { _events = events ?? throw new ArgumentNullException(nameof(events)); _replayHandler = replayHandler ?? throw new ArgumentNullException(nameof(replayHandler)); _replayIntervalMs = Math.Max(1, replayIntervalMs); _speedFactor = Math.Max(0.1, speedFactor); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _cancellationToken = cancellationToken; _linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); } /// /// 启动回放 /// public async Task StartAsync() { if (_replayTask != null) return; StartTime = DateTime.Now; _replayTask = Task.Run(ReplayEventsAsync, _linkedCts.Token); await Task.CompletedTask; } /// /// 暂停回放 /// public async Task PauseAsync() { if (_isPaused || IsCompleted) return; await _pauseSemaphore.WaitAsync(); try { _isPaused = true; } finally { _pauseSemaphore.Release(); } _logger.LogInformation("事件回放已暂停"); } /// /// 恢复回放 /// public async Task ResumeAsync() { if (!_isPaused || IsCompleted) return; await _pauseSemaphore.WaitAsync(); try { _isPaused = false; // 释放信号量以允许回放任务继续 _pauseSemaphore.Release(); } catch { _pauseSemaphore.Release(); throw; } _logger.LogInformation("事件回放已恢复"); } /// /// 停止回放 /// public async Task StopAsync() { if (IsCompleted) return; try { // 取消回放任务 _linkedCts?.Cancel(); // 如果暂停中,先恢复以允许取消 if (_isPaused) { await ResumeAsync(); } // 等待任务完成 if (_replayTask != null) { await Task.WhenAny(_replayTask, Task.Delay(1000)); } IsCompleted = true; EndTime = DateTime.Now; _logger.LogInformation("事件回放已手动停止"); } catch (Exception ex) { _logger.LogError(ex, "停止事件回放时发生错误"); } } /// /// 回放事件处理 /// private async Task ReplayEventsAsync() { try { _logger.LogInformation($"开始回放{_events.Count}个事件,速度因子:{_speedFactor}"); if (_events.Count == 0) { IsCompleted = true; EndTime = DateTime.Now; return; } DateTime? lastEventTime = null; for (int i = 0; i < _events.Count; i++) { // 检查是否取消 if (_linkedCts.Token.IsCancellationRequested) { _logger.LogInformation("事件回放已取消"); break; } // 检查暂停状态 if (_isPaused) { // 等待恢复信号 await _pauseSemaphore.WaitAsync(_linkedCts.Token); _pauseSemaphore.Release(); } var currentEvent = _events[i]; CurrentIndex = i; // 计算等待时间(根据事件之间的实际时间差和速度因子) if (lastEventTime.HasValue && i > 0) { var actualTimeDiff = currentEvent.Timestamp - lastEventTime.Value; var waitTimeMs = (int)(actualTimeDiff.TotalMilliseconds / _speedFactor); // 应用最小等待时间 waitTimeMs = Math.Max(_replayIntervalMs, waitTimeMs); // 等待指定时间 await Task.Delay(waitTimeMs, _linkedCts.Token); } // 处理当前事件 try { await _replayHandler(currentEvent); ProcessedEvents++; // 更新进度 Progress = (int)((i + 1) * 100.0 / _events.Count); } catch (Exception ex) { _logger.LogError(ex, $"处理回放事件时发生错误:{currentEvent.EventType} - {currentEvent.FullPath}"); } lastEventTime = currentEvent.Timestamp; } // 完成回放 IsCompleted = true; Progress = 100; EndTime = DateTime.Now; _logger.LogInformation($"事件回放已完成,共处理{ProcessedEvents}个事件,耗时:{(EndTime.Value - StartTime).TotalSeconds:F2}秒"); } catch (OperationCanceledException) { _logger.LogInformation("事件回放已取消"); IsCompleted = true; EndTime = DateTime.Now; } catch (Exception ex) { _logger.LogError(ex, "事件回放过程中发生错误"); IsCompleted = true; EndTime = DateTime.Now; } } /// /// 释放资源 /// public void Dispose() { if (_disposed) return; _disposed = true; _linkedCts?.Cancel(); _linkedCts?.Dispose(); _pauseSemaphore?.Dispose(); GC.SuppressFinalize(this); } } }