745 lines
26 KiB
C#
745 lines
26 KiB
C#
|
|
using System;
|
|||
|
|
using System.Collections.Concurrent;
|
|||
|
|
using System.Collections.Generic;
|
|||
|
|
using System.IO;
|
|||
|
|
using System.IO.Compression;
|
|||
|
|
using System.Linq;
|
|||
|
|
using System.Text.Json;
|
|||
|
|
using System.Text.Json.Serialization;
|
|||
|
|
using System.Threading;
|
|||
|
|
using System.Threading.Tasks;
|
|||
|
|
using Microsoft.Extensions.Logging;
|
|||
|
|
|
|||
|
|
namespace JiShe.CollectBus.PluginFileWatcher
|
|||
|
|
{
|
|||
|
|
/// <summary>
|
|||
|
|
/// 负责文件事件的存储、查询和回放
|
|||
|
|
/// </summary>
|
|||
|
|
public class EventStorage : IDisposable
|
|||
|
|
{
|
|||
|
|
private readonly FileMonitorConfig _config;
|
|||
|
|
private readonly ILogger _logger;
|
|||
|
|
private readonly ConcurrentQueue<FileEvent> _eventQueue;
|
|||
|
|
private readonly Timer _storageTimer;
|
|||
|
|
private readonly SemaphoreSlim _storageLock = new SemaphoreSlim(1, 1);
|
|||
|
|
private readonly string _storageDirectory;
|
|||
|
|
private readonly EventDatabaseManager _dbManager;
|
|||
|
|
private bool _disposed;
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 创建新的事件存储管理器实例
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="config">文件监控配置</param>
|
|||
|
|
/// <param name="logger">日志记录器</param>
|
|||
|
|
public EventStorage(FileMonitorConfig config, ILogger logger)
|
|||
|
|
{
|
|||
|
|
_config = config ?? throw new ArgumentNullException(nameof(config));
|
|||
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|||
|
|
_eventQueue = new ConcurrentQueue<FileEvent>();
|
|||
|
|
|
|||
|
|
// 确保存储目录存在
|
|||
|
|
_storageDirectory = !string.IsNullOrEmpty(_config.EventStorage.StorageDirectory)
|
|||
|
|
? _config.EventStorage.StorageDirectory
|
|||
|
|
: Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "EventLogs");
|
|||
|
|
|
|||
|
|
if (!Directory.Exists(_storageDirectory))
|
|||
|
|
{
|
|||
|
|
Directory.CreateDirectory(_storageDirectory);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建数据库管理器(如果配置为SQLite存储类型)
|
|||
|
|
if (config.EventStorage.EnableEventStorage &&
|
|||
|
|
config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase))
|
|||
|
|
{
|
|||
|
|
_dbManager = new EventDatabaseManager(config, logger);
|
|||
|
|
_logger.LogInformation("已初始化SQLite事件存储");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 初始化存储定时器(如果启用)
|
|||
|
|
if (_config.EventStorage.EnableEventStorage)
|
|||
|
|
{
|
|||
|
|
var intervalMs = _config.EventStorage.StorageIntervalSeconds * 1000;
|
|||
|
|
_storageTimer = new Timer(SaveEventsTimerCallback, null, intervalMs, intervalMs);
|
|||
|
|
_logger.LogInformation($"事件存储已初始化,存储目录:{_storageDirectory},存储间隔:{_config.EventStorage.StorageIntervalSeconds}秒");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 记录一个文件事件
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="fileEvent">文件事件</param>
|
|||
|
|
public void RecordEvent(FileEvent fileEvent)
|
|||
|
|
{
|
|||
|
|
if (fileEvent == null || !_config.EventStorage.EnableEventStorage) return;
|
|||
|
|
|
|||
|
|
_eventQueue.Enqueue(fileEvent);
|
|||
|
|
_logger.LogDebug($"文件事件已加入队列:{fileEvent.EventType} - {fileEvent.FullPath}");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 从FileSystemEventArgs记录事件
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="e">文件系统事件参数</param>
|
|||
|
|
public void RecordEvent(FileSystemEventArgs e)
|
|||
|
|
{
|
|||
|
|
if (e == null || !_config.EventStorage.EnableEventStorage) return;
|
|||
|
|
|
|||
|
|
var fileEvent = FileEvent.FromFileSystemEventArgs(e);
|
|||
|
|
RecordEvent(fileEvent);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 定时将事件保存到文件
|
|||
|
|
/// </summary>
|
|||
|
|
private async void SaveEventsTimerCallback(object state)
|
|||
|
|
{
|
|||
|
|
if (_disposed || _eventQueue.IsEmpty) return;
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 防止多个定时器回调同时执行
|
|||
|
|
if (!await _storageLock.WaitAsync(0))
|
|||
|
|
{
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 从队列中取出事件
|
|||
|
|
List<FileEvent> eventsToSave = new List<FileEvent>();
|
|||
|
|
int batchSize = _config.EventStorage.BatchSize;
|
|||
|
|
|
|||
|
|
while (eventsToSave.Count < batchSize && !_eventQueue.IsEmpty)
|
|||
|
|
{
|
|||
|
|
if (_eventQueue.TryDequeue(out var fileEvent))
|
|||
|
|
{
|
|||
|
|
eventsToSave.Add(fileEvent);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (eventsToSave.Count > 0)
|
|||
|
|
{
|
|||
|
|
await SaveEventsToFileAsync(eventsToSave);
|
|||
|
|
_logger.LogInformation($"已成功保存 {eventsToSave.Count} 个事件");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 如果有配置,清理旧日志文件
|
|||
|
|
if (_config.EventStorage.MaxLogFiles > 0)
|
|||
|
|
{
|
|||
|
|
await CleanupOldLogFilesAsync();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 如果有配置,清理旧数据库记录
|
|||
|
|
if (_dbManager != null && _config.EventStorage.DataRetentionDays > 0)
|
|||
|
|
{
|
|||
|
|
await _dbManager.CleanupOldDataAsync(_config.EventStorage.DataRetentionDays);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
finally
|
|||
|
|
{
|
|||
|
|
_storageLock.Release();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, "保存事件时发生错误");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 将事件保存到文件
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="events">要保存的事件列表</param>
|
|||
|
|
private async Task SaveEventsToFileAsync(List<FileEvent> events)
|
|||
|
|
{
|
|||
|
|
if (events == null || events.Count == 0) return;
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 根据存储类型选择保存方式
|
|||
|
|
if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
|
|||
|
|
{
|
|||
|
|
// 保存到SQLite数据库
|
|||
|
|
await _dbManager.SaveEventsAsync(events);
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
// 保存到文件
|
|||
|
|
string fileName = string.Format(
|
|||
|
|
_config.EventStorage.LogFileNameFormat,
|
|||
|
|
DateTime.Now);
|
|||
|
|
|
|||
|
|
string filePath = Path.Combine(_storageDirectory, fileName);
|
|||
|
|
|
|||
|
|
// 创建事件日志文件对象
|
|||
|
|
var logFile = new EventLogFile
|
|||
|
|
{
|
|||
|
|
CreatedTime = DateTime.UtcNow,
|
|||
|
|
Events = events
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 序列化为JSON
|
|||
|
|
string jsonContent = JsonSerializer.Serialize(logFile, new JsonSerializerOptions
|
|||
|
|
{
|
|||
|
|
WriteIndented = true
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 是否启用压缩
|
|||
|
|
if (_config.EventStorage.CompressLogFiles)
|
|||
|
|
{
|
|||
|
|
string gzFilePath = $"{filePath}.gz";
|
|||
|
|
await CompressAndSaveStringAsync(jsonContent, gzFilePath);
|
|||
|
|
_logger.LogInformation($"已将事件保存到压缩文件:{gzFilePath}");
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
await File.WriteAllTextAsync(filePath, jsonContent);
|
|||
|
|
_logger.LogInformation($"已将事件保存到文件:{filePath}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, "保存事件到文件时发生错误");
|
|||
|
|
throw;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 压缩并保存字符串到文件
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="content">要保存的内容</param>
|
|||
|
|
/// <param name="filePath">文件路径</param>
|
|||
|
|
private static async Task CompressAndSaveStringAsync(string content, string filePath)
|
|||
|
|
{
|
|||
|
|
using var fileStream = new FileStream(filePath, FileMode.Create);
|
|||
|
|
using var gzipStream = new GZipStream(fileStream, CompressionLevel.Optimal);
|
|||
|
|
using var writer = new StreamWriter(gzipStream);
|
|||
|
|
|
|||
|
|
await writer.WriteAsync(content);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 清理过多的日志文件
|
|||
|
|
/// </summary>
|
|||
|
|
private async Task CleanupOldLogFilesAsync()
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 检查是否需要清理
|
|||
|
|
if (_config.EventStorage.MaxLogFiles <= 0) return;
|
|||
|
|
|
|||
|
|
var directory = new DirectoryInfo(_storageDirectory);
|
|||
|
|
var logFiles = directory.GetFiles("*.*")
|
|||
|
|
.Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
|
|||
|
|
.OrderByDescending(f => f.CreationTime)
|
|||
|
|
.ToArray();
|
|||
|
|
|
|||
|
|
// 如果文件数量超过最大值,删除最旧的文件
|
|||
|
|
if (logFiles.Length > _config.EventStorage.MaxLogFiles)
|
|||
|
|
{
|
|||
|
|
int filesToDelete = logFiles.Length - _config.EventStorage.MaxLogFiles;
|
|||
|
|
var filesToRemove = logFiles.Skip(logFiles.Length - filesToDelete).ToArray();
|
|||
|
|
|
|||
|
|
foreach (var file in filesToRemove)
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
file.Delete();
|
|||
|
|
_logger.LogInformation($"已删除旧的事件日志文件:{file.Name}");
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogWarning(ex, $"删除旧日志文件失败:{file.FullName}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, "清理旧日志文件时发生错误");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
await Task.CompletedTask;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 查询历史事件
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="queryParams">查询参数</param>
|
|||
|
|
/// <returns>查询结果</returns>
|
|||
|
|
public async Task<EventQueryResult> QueryEventsAsync(EventQueryParams queryParams)
|
|||
|
|
{
|
|||
|
|
if (queryParams == null) throw new ArgumentNullException(nameof(queryParams));
|
|||
|
|
|
|||
|
|
// 如果是SQLite存储且数据库管理器可用,使用数据库查询
|
|||
|
|
if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
|
|||
|
|
{
|
|||
|
|
return await _dbManager.QueryEventsAsync(queryParams);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var result = new EventQueryResult
|
|||
|
|
{
|
|||
|
|
StartTime = queryParams.StartTime ?? DateTime.MinValue,
|
|||
|
|
EndTime = queryParams.EndTime ?? DateTime.MaxValue
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
await _storageLock.WaitAsync();
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 获取所有日志文件
|
|||
|
|
var directory = new DirectoryInfo(_storageDirectory);
|
|||
|
|
if (!directory.Exists)
|
|||
|
|
{
|
|||
|
|
return result;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var logFiles = directory.GetFiles("*.*")
|
|||
|
|
.Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
|
|||
|
|
.OrderByDescending(f => f.CreationTime)
|
|||
|
|
.ToArray();
|
|||
|
|
|
|||
|
|
List<FileEvent> allEvents = new List<FileEvent>();
|
|||
|
|
|
|||
|
|
// 加载所有日志文件中的事件
|
|||
|
|
foreach (var file in logFiles)
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
var events = await LoadEventsFromFileAsync(file.FullName);
|
|||
|
|
if (events != null && events.Count > 0)
|
|||
|
|
{
|
|||
|
|
allEvents.AddRange(events);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogWarning(ex, $"从文件加载事件失败:{file.FullName}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 内存中队列的事件也包含在查询中
|
|||
|
|
FileEvent[] queuedEvents = _eventQueue.ToArray();
|
|||
|
|
allEvents.AddRange(queuedEvents);
|
|||
|
|
|
|||
|
|
// 应用查询过滤条件
|
|||
|
|
var filteredEvents = allEvents
|
|||
|
|
.Where(e => (queryParams.StartTime == null || e.Timestamp >= queryParams.StartTime) &&
|
|||
|
|
(queryParams.EndTime == null || e.Timestamp <= queryParams.EndTime) &&
|
|||
|
|
(queryParams.EventType == null || e.EventType == queryParams.EventType.Value) &&
|
|||
|
|
(string.IsNullOrEmpty(queryParams.PathFilter) || e.FullPath.Contains(queryParams.PathFilter, StringComparison.OrdinalIgnoreCase)) &&
|
|||
|
|
(string.IsNullOrEmpty(queryParams.ExtensionFilter) || e.Extension.Equals(queryParams.ExtensionFilter, StringComparison.OrdinalIgnoreCase)))
|
|||
|
|
.ToList();
|
|||
|
|
|
|||
|
|
// 应用排序
|
|||
|
|
IEnumerable<FileEvent> orderedEvents = queryParams.AscendingOrder
|
|||
|
|
? filteredEvents.OrderBy(e => e.Timestamp)
|
|||
|
|
: filteredEvents.OrderByDescending(e => e.Timestamp);
|
|||
|
|
|
|||
|
|
// 计算总数
|
|||
|
|
result.TotalCount = filteredEvents.Count;
|
|||
|
|
|
|||
|
|
// 应用分页
|
|||
|
|
int skip = queryParams.PageIndex * queryParams.PageSize;
|
|||
|
|
int take = queryParams.PageSize;
|
|||
|
|
|
|||
|
|
result.Events = orderedEvents.Skip(skip).Take(take).ToList();
|
|||
|
|
result.HasMore = (skip + take) < result.TotalCount;
|
|||
|
|
|
|||
|
|
return result;
|
|||
|
|
}
|
|||
|
|
finally
|
|||
|
|
{
|
|||
|
|
_storageLock.Release();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, "查询事件时发生错误");
|
|||
|
|
throw;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 从文件加载事件
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="filePath">文件路径</param>
|
|||
|
|
/// <returns>事件列表</returns>
|
|||
|
|
private async Task<List<FileEvent>> LoadEventsFromFileAsync(string filePath)
|
|||
|
|
{
|
|||
|
|
if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath))
|
|||
|
|
{
|
|||
|
|
return new List<FileEvent>();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
string jsonContent;
|
|||
|
|
|
|||
|
|
// 处理压缩文件
|
|||
|
|
if (filePath.EndsWith(".gz"))
|
|||
|
|
{
|
|||
|
|
using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read);
|
|||
|
|
using var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress);
|
|||
|
|
using var reader = new StreamReader(gzipStream);
|
|||
|
|
|
|||
|
|
jsonContent = await reader.ReadToEndAsync();
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
jsonContent = await File.ReadAllTextAsync(filePath);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var logFile = JsonSerializer.Deserialize<EventLogFile>(jsonContent);
|
|||
|
|
return logFile?.Events ?? new List<FileEvent>();
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, $"从文件加载事件失败:{filePath}");
|
|||
|
|
return new List<FileEvent>();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 启动事件回放会话
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="queryParams">查询参数,定义要回放的事件</param>
|
|||
|
|
/// <param name="replayHandler">回放处理回调</param>
|
|||
|
|
/// <param name="cancellationToken">取消标记</param>
|
|||
|
|
/// <returns>回放会话控制器</returns>
|
|||
|
|
public async Task<EventReplaySession> StartReplayAsync(
|
|||
|
|
EventQueryParams queryParams,
|
|||
|
|
Func<FileEvent, Task> replayHandler,
|
|||
|
|
CancellationToken cancellationToken = default)
|
|||
|
|
{
|
|||
|
|
if (replayHandler == null) throw new ArgumentNullException(nameof(replayHandler));
|
|||
|
|
|
|||
|
|
// 查询要回放的事件
|
|||
|
|
var queryResult = await QueryEventsAsync(queryParams);
|
|||
|
|
|
|||
|
|
// 创建并启动回放会话
|
|||
|
|
var session = new EventReplaySession(
|
|||
|
|
queryResult.Events,
|
|||
|
|
replayHandler,
|
|||
|
|
_config.EventStorage.ReplayIntervalMs,
|
|||
|
|
_config.EventStorage.ReplaySpeedFactor,
|
|||
|
|
_logger,
|
|||
|
|
cancellationToken);
|
|||
|
|
|
|||
|
|
await session.StartAsync();
|
|||
|
|
return session;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 释放资源
|
|||
|
|
/// </summary>
|
|||
|
|
public void Dispose()
|
|||
|
|
{
|
|||
|
|
if (_disposed) return;
|
|||
|
|
|
|||
|
|
_disposed = true;
|
|||
|
|
_storageTimer?.Dispose();
|
|||
|
|
_storageLock?.Dispose();
|
|||
|
|
_dbManager?.Dispose();
|
|||
|
|
|
|||
|
|
// 尝试保存剩余事件
|
|||
|
|
if (_config.EventStorage.EnableEventStorage && !_eventQueue.IsEmpty)
|
|||
|
|
{
|
|||
|
|
var remainingEvents = new List<FileEvent>();
|
|||
|
|
while (!_eventQueue.IsEmpty && _eventQueue.TryDequeue(out var evt))
|
|||
|
|
{
|
|||
|
|
remainingEvents.Add(evt);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (remainingEvents.Count > 0)
|
|||
|
|
{
|
|||
|
|
SaveEventsToFileAsync(remainingEvents).GetAwaiter().GetResult();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
GC.SuppressFinalize(this);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 事件回放会话
|
|||
|
|
/// </summary>
|
|||
|
|
public class EventReplaySession : IDisposable
|
|||
|
|
{
|
|||
|
|
private readonly List<FileEvent> _events;
|
|||
|
|
private readonly Func<FileEvent, Task> _replayHandler;
|
|||
|
|
private readonly int _replayIntervalMs;
|
|||
|
|
private readonly double _speedFactor;
|
|||
|
|
private readonly ILogger _logger;
|
|||
|
|
private readonly CancellationToken _cancellationToken;
|
|||
|
|
private CancellationTokenSource _linkedCts;
|
|||
|
|
private Task _replayTask;
|
|||
|
|
private bool _disposed;
|
|||
|
|
private bool _isPaused;
|
|||
|
|
private readonly SemaphoreSlim _pauseSemaphore = new SemaphoreSlim(1, 1);
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 回放进度(0-100)
|
|||
|
|
/// </summary>
|
|||
|
|
public int Progress { get; private set; }
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 当前回放的事件索引
|
|||
|
|
/// </summary>
|
|||
|
|
public int CurrentIndex { get; private set; }
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 事件总数
|
|||
|
|
/// </summary>
|
|||
|
|
public int TotalEvents => _events?.Count ?? 0;
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 回放是否已完成
|
|||
|
|
/// </summary>
|
|||
|
|
public bool IsCompleted { get; private set; }
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 回放是否已暂停
|
|||
|
|
/// </summary>
|
|||
|
|
public bool IsPaused => _isPaused;
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 回放已处理的事件数
|
|||
|
|
/// </summary>
|
|||
|
|
public int ProcessedEvents { get; private set; }
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 回放开始时间
|
|||
|
|
/// </summary>
|
|||
|
|
public DateTime StartTime { get; private set; }
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 回放结束时间(如果已完成)
|
|||
|
|
/// </summary>
|
|||
|
|
public DateTime? EndTime { get; private set; }
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 创建新的事件回放会话
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="events">要回放的事件</param>
|
|||
|
|
/// <param name="replayHandler">回放处理回调</param>
|
|||
|
|
/// <param name="replayIntervalMs">回放间隔(毫秒)</param>
|
|||
|
|
/// <param name="speedFactor">速度因子</param>
|
|||
|
|
/// <param name="logger">日志记录器</param>
|
|||
|
|
/// <param name="cancellationToken">取消标记</param>
|
|||
|
|
public EventReplaySession(
|
|||
|
|
List<FileEvent> events,
|
|||
|
|
Func<FileEvent, Task> replayHandler,
|
|||
|
|
int replayIntervalMs,
|
|||
|
|
double speedFactor,
|
|||
|
|
ILogger logger,
|
|||
|
|
CancellationToken cancellationToken = default)
|
|||
|
|
{
|
|||
|
|
_events = events ?? throw new ArgumentNullException(nameof(events));
|
|||
|
|
_replayHandler = replayHandler ?? throw new ArgumentNullException(nameof(replayHandler));
|
|||
|
|
_replayIntervalMs = Math.Max(1, replayIntervalMs);
|
|||
|
|
_speedFactor = Math.Max(0.1, speedFactor);
|
|||
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|||
|
|
_cancellationToken = cancellationToken;
|
|||
|
|
|
|||
|
|
_linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 启动回放
|
|||
|
|
/// </summary>
|
|||
|
|
public async Task StartAsync()
|
|||
|
|
{
|
|||
|
|
if (_replayTask != null) return;
|
|||
|
|
|
|||
|
|
StartTime = DateTime.Now;
|
|||
|
|
_replayTask = Task.Run(ReplayEventsAsync, _linkedCts.Token);
|
|||
|
|
await Task.CompletedTask;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 暂停回放
|
|||
|
|
/// </summary>
|
|||
|
|
public async Task PauseAsync()
|
|||
|
|
{
|
|||
|
|
if (_isPaused || IsCompleted) return;
|
|||
|
|
|
|||
|
|
await _pauseSemaphore.WaitAsync();
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
_isPaused = true;
|
|||
|
|
}
|
|||
|
|
finally
|
|||
|
|
{
|
|||
|
|
_pauseSemaphore.Release();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
_logger.LogInformation("事件回放已暂停");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 恢复回放
|
|||
|
|
/// </summary>
|
|||
|
|
public async Task ResumeAsync()
|
|||
|
|
{
|
|||
|
|
if (!_isPaused || IsCompleted) return;
|
|||
|
|
|
|||
|
|
await _pauseSemaphore.WaitAsync();
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
_isPaused = false;
|
|||
|
|
// 释放信号量以允许回放任务继续
|
|||
|
|
_pauseSemaphore.Release();
|
|||
|
|
}
|
|||
|
|
catch
|
|||
|
|
{
|
|||
|
|
_pauseSemaphore.Release();
|
|||
|
|
throw;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
_logger.LogInformation("事件回放已恢复");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 停止回放
|
|||
|
|
/// </summary>
|
|||
|
|
public async Task StopAsync()
|
|||
|
|
{
|
|||
|
|
if (IsCompleted) return;
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 取消回放任务
|
|||
|
|
_linkedCts?.Cancel();
|
|||
|
|
|
|||
|
|
// 如果暂停中,先恢复以允许取消
|
|||
|
|
if (_isPaused)
|
|||
|
|
{
|
|||
|
|
await ResumeAsync();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 等待任务完成
|
|||
|
|
if (_replayTask != null)
|
|||
|
|
{
|
|||
|
|
await Task.WhenAny(_replayTask, Task.Delay(1000));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
IsCompleted = true;
|
|||
|
|
EndTime = DateTime.Now;
|
|||
|
|
|
|||
|
|
_logger.LogInformation("事件回放已手动停止");
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, "停止事件回放时发生错误");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 回放事件处理
|
|||
|
|
/// </summary>
|
|||
|
|
private async Task ReplayEventsAsync()
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
_logger.LogInformation($"开始回放{_events.Count}个事件,速度因子:{_speedFactor}");
|
|||
|
|
|
|||
|
|
if (_events.Count == 0)
|
|||
|
|
{
|
|||
|
|
IsCompleted = true;
|
|||
|
|
EndTime = DateTime.Now;
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
DateTime? lastEventTime = null;
|
|||
|
|
|
|||
|
|
for (int i = 0; i < _events.Count; i++)
|
|||
|
|
{
|
|||
|
|
// 检查是否取消
|
|||
|
|
if (_linkedCts.Token.IsCancellationRequested)
|
|||
|
|
{
|
|||
|
|
_logger.LogInformation("事件回放已取消");
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 检查暂停状态
|
|||
|
|
if (_isPaused)
|
|||
|
|
{
|
|||
|
|
// 等待恢复信号
|
|||
|
|
await _pauseSemaphore.WaitAsync(_linkedCts.Token);
|
|||
|
|
_pauseSemaphore.Release();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var currentEvent = _events[i];
|
|||
|
|
CurrentIndex = i;
|
|||
|
|
|
|||
|
|
// 计算等待时间(根据事件之间的实际时间差和速度因子)
|
|||
|
|
if (lastEventTime.HasValue && i > 0)
|
|||
|
|
{
|
|||
|
|
var actualTimeDiff = currentEvent.Timestamp - lastEventTime.Value;
|
|||
|
|
var waitTimeMs = (int)(actualTimeDiff.TotalMilliseconds / _speedFactor);
|
|||
|
|
|
|||
|
|
// 应用最小等待时间
|
|||
|
|
waitTimeMs = Math.Max(_replayIntervalMs, waitTimeMs);
|
|||
|
|
|
|||
|
|
// 等待指定时间
|
|||
|
|
await Task.Delay(waitTimeMs, _linkedCts.Token);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 处理当前事件
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
await _replayHandler(currentEvent);
|
|||
|
|
ProcessedEvents++;
|
|||
|
|
|
|||
|
|
// 更新进度
|
|||
|
|
Progress = (int)((i + 1) * 100.0 / _events.Count);
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, $"处理回放事件时发生错误:{currentEvent.EventType} - {currentEvent.FullPath}");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
lastEventTime = currentEvent.Timestamp;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 完成回放
|
|||
|
|
IsCompleted = true;
|
|||
|
|
Progress = 100;
|
|||
|
|
EndTime = DateTime.Now;
|
|||
|
|
|
|||
|
|
_logger.LogInformation($"事件回放已完成,共处理{ProcessedEvents}个事件,耗时:{(EndTime.Value - StartTime).TotalSeconds:F2}秒");
|
|||
|
|
}
|
|||
|
|
catch (OperationCanceledException)
|
|||
|
|
{
|
|||
|
|
_logger.LogInformation("事件回放已取消");
|
|||
|
|
IsCompleted = true;
|
|||
|
|
EndTime = DateTime.Now;
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.LogError(ex, "事件回放过程中发生错误");
|
|||
|
|
IsCompleted = true;
|
|||
|
|
EndTime = DateTime.Now;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 释放资源
|
|||
|
|
/// </summary>
|
|||
|
|
public void Dispose()
|
|||
|
|
{
|
|||
|
|
if (_disposed) return;
|
|||
|
|
_disposed = true;
|
|||
|
|
|
|||
|
|
_linkedCts?.Cancel();
|
|||
|
|
_linkedCts?.Dispose();
|
|||
|
|
_pauseSemaphore?.Dispose();
|
|||
|
|
|
|||
|
|
GC.SuppressFinalize(this);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|