using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.PluginFileWatcher
{
///
/// 负责文件事件的存储、查询和回放
///
public class EventStorage : IDisposable
{
private readonly FileMonitorConfig _config;
private readonly ILogger _logger;
private readonly ConcurrentQueue _eventQueue;
private readonly Timer _storageTimer;
private readonly SemaphoreSlim _storageLock = new SemaphoreSlim(1, 1);
private readonly string _storageDirectory;
private readonly EventDatabaseManager _dbManager;
private bool _disposed;
///
/// 创建新的事件存储管理器实例
///
/// 文件监控配置
/// 日志记录器
public EventStorage(FileMonitorConfig config, ILogger logger)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_eventQueue = new ConcurrentQueue();
// 确保存储目录存在
_storageDirectory = !string.IsNullOrEmpty(_config.EventStorage.StorageDirectory)
? _config.EventStorage.StorageDirectory
: Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "EventLogs");
if (!Directory.Exists(_storageDirectory))
{
Directory.CreateDirectory(_storageDirectory);
}
// 创建数据库管理器(如果配置为SQLite存储类型)
if (config.EventStorage.EnableEventStorage &&
config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase))
{
_dbManager = new EventDatabaseManager(config, logger);
_logger.LogInformation("已初始化SQLite事件存储");
}
// 初始化存储定时器(如果启用)
if (_config.EventStorage.EnableEventStorage)
{
var intervalMs = _config.EventStorage.StorageIntervalSeconds * 1000;
_storageTimer = new Timer(SaveEventsTimerCallback, null, intervalMs, intervalMs);
_logger.LogInformation($"事件存储已初始化,存储目录:{_storageDirectory},存储间隔:{_config.EventStorage.StorageIntervalSeconds}秒");
}
}
///
/// 记录一个文件事件
///
/// 文件事件
public void RecordEvent(FileEvent fileEvent)
{
if (fileEvent == null || !_config.EventStorage.EnableEventStorage) return;
_eventQueue.Enqueue(fileEvent);
_logger.LogDebug($"文件事件已加入队列:{fileEvent.EventType} - {fileEvent.FullPath}");
}
///
/// 从FileSystemEventArgs记录事件
///
/// 文件系统事件参数
public void RecordEvent(FileSystemEventArgs e)
{
if (e == null || !_config.EventStorage.EnableEventStorage) return;
var fileEvent = FileEvent.FromFileSystemEventArgs(e);
RecordEvent(fileEvent);
}
///
/// 定时将事件保存到文件
///
private async void SaveEventsTimerCallback(object state)
{
if (_disposed || _eventQueue.IsEmpty) return;
try
{
// 防止多个定时器回调同时执行
if (!await _storageLock.WaitAsync(0))
{
return;
}
try
{
// 从队列中取出事件
List eventsToSave = new List();
int batchSize = _config.EventStorage.BatchSize;
while (eventsToSave.Count < batchSize && !_eventQueue.IsEmpty)
{
if (_eventQueue.TryDequeue(out var fileEvent))
{
eventsToSave.Add(fileEvent);
}
}
if (eventsToSave.Count > 0)
{
await SaveEventsToFileAsync(eventsToSave);
_logger.LogInformation($"已成功保存 {eventsToSave.Count} 个事件");
}
// 如果有配置,清理旧日志文件
if (_config.EventStorage.MaxLogFiles > 0)
{
await CleanupOldLogFilesAsync();
}
// 如果有配置,清理旧数据库记录
if (_dbManager != null && _config.EventStorage.DataRetentionDays > 0)
{
await _dbManager.CleanupOldDataAsync(_config.EventStorage.DataRetentionDays);
}
}
finally
{
_storageLock.Release();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "保存事件时发生错误");
}
}
///
/// 将事件保存到文件
///
/// 要保存的事件列表
private async Task SaveEventsToFileAsync(List events)
{
if (events == null || events.Count == 0) return;
try
{
// 根据存储类型选择保存方式
if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
{
// 保存到SQLite数据库
await _dbManager.SaveEventsAsync(events);
}
else
{
// 保存到文件
string fileName = string.Format(
_config.EventStorage.LogFileNameFormat,
DateTime.Now);
string filePath = Path.Combine(_storageDirectory, fileName);
// 创建事件日志文件对象
var logFile = new EventLogFile
{
CreatedTime = DateTime.UtcNow,
Events = events
};
// 序列化为JSON
string jsonContent = JsonSerializer.Serialize(logFile, new JsonSerializerOptions
{
WriteIndented = true
});
// 是否启用压缩
if (_config.EventStorage.CompressLogFiles)
{
string gzFilePath = $"{filePath}.gz";
await CompressAndSaveStringAsync(jsonContent, gzFilePath);
_logger.LogInformation($"已将事件保存到压缩文件:{gzFilePath}");
}
else
{
await File.WriteAllTextAsync(filePath, jsonContent);
_logger.LogInformation($"已将事件保存到文件:{filePath}");
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "保存事件到文件时发生错误");
throw;
}
}
///
/// 压缩并保存字符串到文件
///
/// 要保存的内容
/// 文件路径
private static async Task CompressAndSaveStringAsync(string content, string filePath)
{
using var fileStream = new FileStream(filePath, FileMode.Create);
using var gzipStream = new GZipStream(fileStream, CompressionLevel.Optimal);
using var writer = new StreamWriter(gzipStream);
await writer.WriteAsync(content);
}
///
/// 清理过多的日志文件
///
private async Task CleanupOldLogFilesAsync()
{
try
{
// 检查是否需要清理
if (_config.EventStorage.MaxLogFiles <= 0) return;
var directory = new DirectoryInfo(_storageDirectory);
var logFiles = directory.GetFiles("*.*")
.Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
.OrderByDescending(f => f.CreationTime)
.ToArray();
// 如果文件数量超过最大值,删除最旧的文件
if (logFiles.Length > _config.EventStorage.MaxLogFiles)
{
int filesToDelete = logFiles.Length - _config.EventStorage.MaxLogFiles;
var filesToRemove = logFiles.Skip(logFiles.Length - filesToDelete).ToArray();
foreach (var file in filesToRemove)
{
try
{
file.Delete();
_logger.LogInformation($"已删除旧的事件日志文件:{file.Name}");
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"删除旧日志文件失败:{file.FullName}");
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "清理旧日志文件时发生错误");
}
await Task.CompletedTask;
}
///
/// 查询历史事件
///
/// 查询参数
/// 查询结果
public async Task QueryEventsAsync(EventQueryParams queryParams)
{
if (queryParams == null) throw new ArgumentNullException(nameof(queryParams));
// 如果是SQLite存储且数据库管理器可用,使用数据库查询
if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
{
return await _dbManager.QueryEventsAsync(queryParams);
}
var result = new EventQueryResult
{
StartTime = queryParams.StartTime ?? DateTime.MinValue,
EndTime = queryParams.EndTime ?? DateTime.MaxValue
};
try
{
await _storageLock.WaitAsync();
try
{
// 获取所有日志文件
var directory = new DirectoryInfo(_storageDirectory);
if (!directory.Exists)
{
return result;
}
var logFiles = directory.GetFiles("*.*")
.Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
.OrderByDescending(f => f.CreationTime)
.ToArray();
List allEvents = new List();
// 加载所有日志文件中的事件
foreach (var file in logFiles)
{
try
{
var events = await LoadEventsFromFileAsync(file.FullName);
if (events != null && events.Count > 0)
{
allEvents.AddRange(events);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"从文件加载事件失败:{file.FullName}");
}
}
// 内存中队列的事件也包含在查询中
FileEvent[] queuedEvents = _eventQueue.ToArray();
allEvents.AddRange(queuedEvents);
// 应用查询过滤条件
var filteredEvents = allEvents
.Where(e => (queryParams.StartTime == null || e.Timestamp >= queryParams.StartTime) &&
(queryParams.EndTime == null || e.Timestamp <= queryParams.EndTime) &&
(queryParams.EventType == null || e.EventType == queryParams.EventType.Value) &&
(string.IsNullOrEmpty(queryParams.PathFilter) || e.FullPath.Contains(queryParams.PathFilter, StringComparison.OrdinalIgnoreCase)) &&
(string.IsNullOrEmpty(queryParams.ExtensionFilter) || e.Extension.Equals(queryParams.ExtensionFilter, StringComparison.OrdinalIgnoreCase)))
.ToList();
// 应用排序
IEnumerable orderedEvents = queryParams.AscendingOrder
? filteredEvents.OrderBy(e => e.Timestamp)
: filteredEvents.OrderByDescending(e => e.Timestamp);
// 计算总数
result.TotalCount = filteredEvents.Count;
// 应用分页
int skip = queryParams.PageIndex * queryParams.PageSize;
int take = queryParams.PageSize;
result.Events = orderedEvents.Skip(skip).Take(take).ToList();
result.HasMore = (skip + take) < result.TotalCount;
return result;
}
finally
{
_storageLock.Release();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "查询事件时发生错误");
throw;
}
}
///
/// 从文件加载事件
///
/// 文件路径
/// 事件列表
private async Task> LoadEventsFromFileAsync(string filePath)
{
if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath))
{
return new List();
}
try
{
string jsonContent;
// 处理压缩文件
if (filePath.EndsWith(".gz"))
{
using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read);
using var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress);
using var reader = new StreamReader(gzipStream);
jsonContent = await reader.ReadToEndAsync();
}
else
{
jsonContent = await File.ReadAllTextAsync(filePath);
}
var logFile = JsonSerializer.Deserialize(jsonContent);
return logFile?.Events ?? new List();
}
catch (Exception ex)
{
_logger.LogError(ex, $"从文件加载事件失败:{filePath}");
return new List();
}
}
///
/// 启动事件回放会话
///
/// 查询参数,定义要回放的事件
/// 回放处理回调
/// 取消标记
/// 回放会话控制器
public async Task StartReplayAsync(
EventQueryParams queryParams,
Func replayHandler,
CancellationToken cancellationToken = default)
{
if (replayHandler == null) throw new ArgumentNullException(nameof(replayHandler));
// 查询要回放的事件
var queryResult = await QueryEventsAsync(queryParams);
// 创建并启动回放会话
var session = new EventReplaySession(
queryResult.Events,
replayHandler,
_config.EventStorage.ReplayIntervalMs,
_config.EventStorage.ReplaySpeedFactor,
_logger,
cancellationToken);
await session.StartAsync();
return session;
}
///
/// 释放资源
///
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_storageTimer?.Dispose();
_storageLock?.Dispose();
_dbManager?.Dispose();
// 尝试保存剩余事件
if (_config.EventStorage.EnableEventStorage && !_eventQueue.IsEmpty)
{
var remainingEvents = new List();
while (!_eventQueue.IsEmpty && _eventQueue.TryDequeue(out var evt))
{
remainingEvents.Add(evt);
}
if (remainingEvents.Count > 0)
{
SaveEventsToFileAsync(remainingEvents).GetAwaiter().GetResult();
}
}
GC.SuppressFinalize(this);
}
}
///
/// 事件回放会话
///
public class EventReplaySession : IDisposable
{
private readonly List _events;
private readonly Func _replayHandler;
private readonly int _replayIntervalMs;
private readonly double _speedFactor;
private readonly ILogger _logger;
private readonly CancellationToken _cancellationToken;
private CancellationTokenSource _linkedCts;
private Task _replayTask;
private bool _disposed;
private bool _isPaused;
private readonly SemaphoreSlim _pauseSemaphore = new SemaphoreSlim(1, 1);
///
/// 回放进度(0-100)
///
public int Progress { get; private set; }
///
/// 当前回放的事件索引
///
public int CurrentIndex { get; private set; }
///
/// 事件总数
///
public int TotalEvents => _events?.Count ?? 0;
///
/// 回放是否已完成
///
public bool IsCompleted { get; private set; }
///
/// 回放是否已暂停
///
public bool IsPaused => _isPaused;
///
/// 回放已处理的事件数
///
public int ProcessedEvents { get; private set; }
///
/// 回放开始时间
///
public DateTime StartTime { get; private set; }
///
/// 回放结束时间(如果已完成)
///
public DateTime? EndTime { get; private set; }
///
/// 创建新的事件回放会话
///
/// 要回放的事件
/// 回放处理回调
/// 回放间隔(毫秒)
/// 速度因子
/// 日志记录器
/// 取消标记
public EventReplaySession(
List events,
Func replayHandler,
int replayIntervalMs,
double speedFactor,
ILogger logger,
CancellationToken cancellationToken = default)
{
_events = events ?? throw new ArgumentNullException(nameof(events));
_replayHandler = replayHandler ?? throw new ArgumentNullException(nameof(replayHandler));
_replayIntervalMs = Math.Max(1, replayIntervalMs);
_speedFactor = Math.Max(0.1, speedFactor);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_cancellationToken = cancellationToken;
_linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
}
///
/// 启动回放
///
public async Task StartAsync()
{
if (_replayTask != null) return;
StartTime = DateTime.Now;
_replayTask = Task.Run(ReplayEventsAsync, _linkedCts.Token);
await Task.CompletedTask;
}
///
/// 暂停回放
///
public async Task PauseAsync()
{
if (_isPaused || IsCompleted) return;
await _pauseSemaphore.WaitAsync();
try
{
_isPaused = true;
}
finally
{
_pauseSemaphore.Release();
}
_logger.LogInformation("事件回放已暂停");
}
///
/// 恢复回放
///
public async Task ResumeAsync()
{
if (!_isPaused || IsCompleted) return;
await _pauseSemaphore.WaitAsync();
try
{
_isPaused = false;
// 释放信号量以允许回放任务继续
_pauseSemaphore.Release();
}
catch
{
_pauseSemaphore.Release();
throw;
}
_logger.LogInformation("事件回放已恢复");
}
///
/// 停止回放
///
public async Task StopAsync()
{
if (IsCompleted) return;
try
{
// 取消回放任务
_linkedCts?.Cancel();
// 如果暂停中,先恢复以允许取消
if (_isPaused)
{
await ResumeAsync();
}
// 等待任务完成
if (_replayTask != null)
{
await Task.WhenAny(_replayTask, Task.Delay(1000));
}
IsCompleted = true;
EndTime = DateTime.Now;
_logger.LogInformation("事件回放已手动停止");
}
catch (Exception ex)
{
_logger.LogError(ex, "停止事件回放时发生错误");
}
}
///
/// 回放事件处理
///
private async Task ReplayEventsAsync()
{
try
{
_logger.LogInformation($"开始回放{_events.Count}个事件,速度因子:{_speedFactor}");
if (_events.Count == 0)
{
IsCompleted = true;
EndTime = DateTime.Now;
return;
}
DateTime? lastEventTime = null;
for (int i = 0; i < _events.Count; i++)
{
// 检查是否取消
if (_linkedCts.Token.IsCancellationRequested)
{
_logger.LogInformation("事件回放已取消");
break;
}
// 检查暂停状态
if (_isPaused)
{
// 等待恢复信号
await _pauseSemaphore.WaitAsync(_linkedCts.Token);
_pauseSemaphore.Release();
}
var currentEvent = _events[i];
CurrentIndex = i;
// 计算等待时间(根据事件之间的实际时间差和速度因子)
if (lastEventTime.HasValue && i > 0)
{
var actualTimeDiff = currentEvent.Timestamp - lastEventTime.Value;
var waitTimeMs = (int)(actualTimeDiff.TotalMilliseconds / _speedFactor);
// 应用最小等待时间
waitTimeMs = Math.Max(_replayIntervalMs, waitTimeMs);
// 等待指定时间
await Task.Delay(waitTimeMs, _linkedCts.Token);
}
// 处理当前事件
try
{
await _replayHandler(currentEvent);
ProcessedEvents++;
// 更新进度
Progress = (int)((i + 1) * 100.0 / _events.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理回放事件时发生错误:{currentEvent.EventType} - {currentEvent.FullPath}");
}
lastEventTime = currentEvent.Timestamp;
}
// 完成回放
IsCompleted = true;
Progress = 100;
EndTime = DateTime.Now;
_logger.LogInformation($"事件回放已完成,共处理{ProcessedEvents}个事件,耗时:{(EndTime.Value - StartTime).TotalSeconds:F2}秒");
}
catch (OperationCanceledException)
{
_logger.LogInformation("事件回放已取消");
IsCompleted = true;
EndTime = DateTime.Now;
}
catch (Exception ex)
{
_logger.LogError(ex, "事件回放过程中发生错误");
IsCompleted = true;
EndTime = DateTime.Now;
}
}
///
/// 释放资源
///
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_linkedCts?.Cancel();
_linkedCts?.Dispose();
_pauseSemaphore?.Dispose();
GC.SuppressFinalize(this);
}
}
}