1111 lines
46 KiB
C#
Raw Permalink Normal View History

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Linq;
using Microsoft.Extensions.Configuration;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Serilog;
using Serilog.Events;
using Serilog.Extensions.Logging;
using ILogger = Microsoft.Extensions.Logging.ILogger;
namespace JiShe.CollectBus.PluginFileWatcher
{
// 删除配置类的定义
class Program
{
// 配置对象从appsettings.json加载
private static FileMonitorConfig _config = new FileMonitorConfig();
// 全局计数器和动态配置参数
private static long _totalEventsProcessed = 0;
// 监控器健康状态追踪
private static DateTime _lastEventTime = DateTime.UtcNow;
private static int _watcherRestartAttempts = 0;
private static FileSystemWatcher _activeWatcher = null;
// 事件存储和回放
private static EventStorage _eventStorage = null;
private static EventReplaySession _currentReplaySession = null;
private static ILogger _logger = NullLogger.Instance;
static async Task Main(string[] args)
{
Console.OutputEncoding = System.Text.Encoding.UTF8;
// 检查是否以数据库工具模式运行
if (args.Length > 0 && args[0].Equals("db", StringComparison.OrdinalIgnoreCase))
{
// 运行数据库工具
await RunDatabaseUtilityAsync(args.Skip(1).ToArray());
return;
}
// 加载配置文件
LoadConfiguration();
// 配置Serilog
ConfigureSerilog();
// 初始化日志记录器
_logger = new SerilogLoggerFactory().CreateLogger("FileMonitor");
Log.Information("高性能文件监控程序启动中...");
// 初始化事件存储
if (_config.EventStorage.EnableEventStorage)
{
_eventStorage = new EventStorage(_config, _logger);
Log.Information("事件存储已启用,存储目录:{Directory}", _config.EventStorage.StorageDirectory);
}
// 打印配置信息
LogConfiguration();
// 获取监控路径:优先命令行参数,其次配置文件,最后使用当前目录
string pathToMonitor = args.Length > 0 ? args[0] :
!string.IsNullOrEmpty(_config.General.DefaultMonitorPath) ?
_config.General.DefaultMonitorPath : Directory.GetCurrentDirectory();
// 命令行参数可以覆盖配置文件
if (args.Length > 1 && args[1].Equals("--no-filter", StringComparison.OrdinalIgnoreCase))
{
_config.General.EnableFileFiltering = false;
Log.Information("通过命令行参数禁用文件类型过滤,将监控所有文件");
}
else if (_config.General.EnableFileFiltering)
{
Log.Information("已启用文件类型过滤,仅监控以下类型: {Extensions}", string.Join(", ", _config.FileFilters.AllowedExtensions));
}
if (!Directory.Exists(pathToMonitor))
{
Log.Warning("错误:监控目录 '{Path}' 不存在。", pathToMonitor);
Log.Information("尝试创建该目录...");
try
{
Directory.CreateDirectory(pathToMonitor);
Log.Information("已成功创建目录: {Path}", pathToMonitor);
}
catch (Exception ex)
{
Log.Error(ex, "创建目录失败: {Message}", ex.Message);
Log.Information("将使用当前目录作为监控路径。");
pathToMonitor = Directory.GetCurrentDirectory();
}
}
Log.Information("开始监控目录: {Path}", pathToMonitor);
Log.Information("按 'Q' 退出程序,按 'R' 重新加载配置,按 'H' 检查监控器健康状态。");
if (_config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay)
{
Log.Information("按 'P' 开始/暂停回放,按 'S' 停止回放,按 'B' 查询事件。");
}
// 创建一个容量由配置文件指定的有界通道
var channel = Channel.CreateBounded<FileSystemEventArgs>(new BoundedChannelOptions(_config.Performance.ChannelCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest // 当通道满时丢弃旧事件
});
using var cts = new CancellationTokenSource();
// 启动内存监控
var memoryMonitorTask = StartMemoryMonitorAsync(cts.Token);
// 启动文件监控器
var monitorTask = StartFileMonitorAsync(pathToMonitor, channel.Writer, cts.Token);
// 启动健康监控任务
var healthMonitorTask = _config.Robustness.EnableAutoRecovery ?
StartWatcherHealthMonitorAsync(pathToMonitor, channel.Writer, cts.Token) : Task.CompletedTask;
// 启动事件处理器
var processorTask = ProcessEventsAsync(channel.Reader, cts.Token);
// 等待用户按下键退出或重新加载配置
bool needRestart = false;
while (true)
{
var key = Console.ReadKey(true).Key;
if (key == ConsoleKey.Q)
{
break; // 退出循环
}
else if (key == ConsoleKey.R)
{
Log.Information("正在重新加载配置...");
LoadConfiguration();
// 重新配置Serilog
ConfigureSerilog();
LogConfiguration();
Log.Information("配置已重新加载,部分配置需要重启程序才能生效。");
}
else if (key == ConsoleKey.H)
{
// 手动检查监控器健康状态
bool isHealthy = _activeWatcher != null &&
FileWatcherUtils.IsWatcherHealthy(_activeWatcher, _lastEventTime, _config.Robustness);
Log.Information("监控器健康状态: {Status}", (isHealthy ? "正常" : "异常"));
Log.Information("上次事件时间: {Time}", _lastEventTime);
Log.Information("重启次数: {Count}/{MaxCount}", _watcherRestartAttempts, _config.Robustness.MaxRestartAttempts);
Log.Information("已处理事件总数: {Count}", _totalEventsProcessed);
}
else if (key == ConsoleKey.P && _config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay)
{
await HandleEventReplayToggleAsync();
}
else if (key == ConsoleKey.S && _config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay)
{
await StopReplayAsync();
}
else if (key == ConsoleKey.B && _config.EventStorage.EnableEventStorage)
{
await QueryEventsAsync();
}
await Task.Delay(100);
}
// 取消所有任务
cts.Cancel();
// 停止回放(如果正在进行)
await StopReplayAsync();
// 释放事件存储
_eventStorage?.Dispose();
try
{
var tasks = new List<Task> { monitorTask, processorTask, memoryMonitorTask };
if (_config.Robustness.EnableAutoRecovery)
{
tasks.Add(healthMonitorTask);
}
await Task.WhenAll(tasks);
}
catch (OperationCanceledException)
{
Log.Information("程序已正常退出。");
}
// 关闭Serilog
Log.CloseAndFlush();
}
// 配置Serilog
private static void ConfigureSerilog()
{
var configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
var loggerConfig = new LoggerConfiguration()
.ReadFrom.Configuration(configuration);
// 根据配置设置最小日志级别
LogEventLevel minimumLevel = LogEventLevel.Information;
if (!string.IsNullOrEmpty(_config.Logging?.LogLevel))
{
switch (_config.Logging.LogLevel.ToLower())
{
case "verbose":
minimumLevel = LogEventLevel.Verbose;
break;
case "debug":
minimumLevel = LogEventLevel.Debug;
break;
case "information":
minimumLevel = LogEventLevel.Information;
break;
case "warning":
minimumLevel = LogEventLevel.Warning;
break;
case "error":
minimumLevel = LogEventLevel.Error;
break;
case "fatal":
minimumLevel = LogEventLevel.Fatal;
break;
}
}
// 如果配置文件中没有Serilog节使用默认配置
var hasConfig = configuration.GetSection("Serilog").Exists();
if (!hasConfig)
{
loggerConfig
.MinimumLevel.Is(minimumLevel)
.WriteTo.Console(
outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}"
)
.WriteTo.File(
Path.Combine(_config.Logging?.LogDirectory ?? "Logs", "filemonitor-.log"),
rollingInterval: RollingInterval.Day,
outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}",
retainedFileCountLimit: _config.Logging?.RetainedLogDays ?? 30
);
}
Log.Logger = loggerConfig.CreateLogger();
}
// 查询事件
private static async Task QueryEventsAsync()
{
if (_eventStorage == null)
{
Log.Warning("事件存储未启用");
return;
}
try
{
Log.Information("\n================ 事件查询 ================");
Console.WriteLine("请输入查询参数,直接回车使用默认值");
// 收集查询参数
var queryParams = new EventQueryParams();
Console.Write("开始时间 (yyyy-MM-dd HH:mm:ss)默认24小时前: ");
string startTimeStr = Console.ReadLine();
if (!string.IsNullOrEmpty(startTimeStr))
{
if (DateTime.TryParse(startTimeStr, out var startTime))
{
queryParams.StartTime = startTime;
}
else
{
Log.Warning("无效的日期格式,使用默认值");
queryParams.StartTime = DateTime.Now.AddDays(-1);
}
}
else
{
queryParams.StartTime = DateTime.Now.AddDays(-1);
}
Console.Write("结束时间 (yyyy-MM-dd HH:mm:ss),默认当前时间: ");
string endTimeStr = Console.ReadLine();
if (!string.IsNullOrEmpty(endTimeStr))
{
if (DateTime.TryParse(endTimeStr, out var endTime))
{
queryParams.EndTime = endTime;
}
else
{
Log.Warning("无效的日期格式,使用默认值");
}
}
Console.Write("事件类型 (Created, Modified, Deleted, Renamed, All)默认All: ");
string eventTypeStr = Console.ReadLine();
if (!string.IsNullOrEmpty(eventTypeStr) && eventTypeStr.ToLower() != "all")
{
if (Enum.TryParse<FileEventType>(eventTypeStr, true, out var eventType))
{
queryParams.EventType = eventType;
}
else
{
Log.Warning("无效的事件类型使用默认值All");
}
}
Console.Write("路径过滤 (包含此文本的路径),默认无: ");
string pathFilter = Console.ReadLine();
if (!string.IsNullOrEmpty(pathFilter))
{
queryParams.PathFilter = pathFilter;
}
Console.Write("文件扩展名过滤 (.txt, .exe等),默认无: ");
string extensionFilter = Console.ReadLine();
if (!string.IsNullOrEmpty(extensionFilter))
{
queryParams.ExtensionFilter = extensionFilter;
}
Console.Write("每页显示数量默认20: ");
string pageSizeStr = Console.ReadLine();
if (!string.IsNullOrEmpty(pageSizeStr) && int.TryParse(pageSizeStr, out var pageSize))
{
queryParams.PageSize = pageSize;
}
else
{
queryParams.PageSize = 20;
}
// 执行查询
var result = await _eventStorage.QueryEventsAsync(queryParams);
Log.Information("查询结果 (共{Count}条记录)", result.TotalCount);
Log.Information("时间范围: {StartTime} 至 {EndTime}", result.StartTime, result.EndTime);
Console.WriteLine("---------------------------------------------");
if (result.Events.Count == 0)
{
Log.Warning("未找到符合条件的事件");
}
else
{
foreach (var evt in result.Events)
{
string eventType = evt.EventType.ToString();
string timeStr = evt.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff");
Console.WriteLine($"[{timeStr}] [{eventType}] {evt.FileName}");
Console.WriteLine($" 路径: {evt.Directory}");
if (evt.EventType == FileEventType.Renamed && !string.IsNullOrEmpty(evt.OldFileName))
{
Console.WriteLine($" 原名称: {evt.OldFileName}");
}
Console.WriteLine();
}
if (result.HasMore)
{
Log.Warning("... 还有更多记录未显示 ...");
}
}
// 询问是否开始回放
if (result.Events.Count > 0 && _config.EventStorage.EnableEventReplay)
{
Console.Write("\n是否要回放这些事件? (Y/N): ");
string replayAnswer = Console.ReadLine();
if (!string.IsNullOrEmpty(replayAnswer) && replayAnswer.ToUpper().StartsWith("Y"))
{
await StartReplayAsync(queryParams);
}
}
}
catch (Exception ex)
{
Log.Error(ex, "查询事件时出错: {Message}", ex.Message);
}
Console.WriteLine("=============================================\n");
}
// 处理回放切换(开始/暂停)
private static async Task HandleEventReplayToggleAsync()
{
if (_eventStorage == null) return;
try
{
if (_currentReplaySession == null)
{
// 如果没有活动的回放会话,创建一个简单的查询参数并开始回放
var queryParams = new EventQueryParams
{
StartTime = DateTime.Now.AddHours(-1),
PageSize = 1000
};
await StartReplayAsync(queryParams);
}
else if (_currentReplaySession.IsPaused)
{
// 恢复回放
await _currentReplaySession.ResumeAsync();
Log.Information("已恢复事件回放");
}
else
{
// 暂停回放
await _currentReplaySession.PauseAsync();
Log.Information("已暂停事件回放");
}
}
catch (Exception ex)
{
Log.Error(ex, "处理回放时出错: {Message}", ex.Message);
}
}
// 开始回放
private static async Task StartReplayAsync(EventQueryParams queryParams)
{
if (_eventStorage == null || !_config.EventStorage.EnableEventReplay) return;
try
{
// 停止现有回放
await StopReplayAsync();
// 创建回放处理器
Func<FileEvent, Task> replayHandler = async (e) =>
{
// 在控制台上显示回放的事件
string eventType = e.EventType.ToString();
string timeStr = e.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff");
Log.Information("[回放] [{Time}] [{Type}] {FileName}", timeStr, eventType, e.FileName);
if (_config.Logging.LogFileEventDetails)
{
Log.Debug(" 路径: {Path}", e.Directory);
if (e.EventType == FileEventType.Renamed && !string.IsNullOrEmpty(e.OldFileName))
{
Log.Debug(" 原名称: {OldName}", e.OldFileName);
}
}
// 这里可以添加实际的文件操作,比如创建、删除文件等
// 但在此示例中,我们只是显示事件而不执行实际操作
await Task.CompletedTask;
};
// 开始回放会话
_currentReplaySession = await _eventStorage.StartReplayAsync(
queryParams,
replayHandler,
CancellationToken.None);
// 如果会话创建成功,开始回放
if (_currentReplaySession != null)
{
await _currentReplaySession.StartAsync();
Log.Information("开始回放事件,共 {Count} 个事件", _currentReplaySession.TotalEvents);
Log.Information("回放速度: {Speed}", (_config.EventStorage.ReplaySpeedFactor > 1 ?
$"{_config.EventStorage.ReplaySpeedFactor}x (加速)" :
$"{_config.EventStorage.ReplaySpeedFactor}x (减速)"));
}
else
{
Log.Warning("未找到符合条件的事件,回放未开始");
}
}
catch (Exception ex)
{
Log.Error(ex, "开始回放时出错: {Message}", ex.Message);
}
}
// 停止回放
private static async Task StopReplayAsync()
{
if (_currentReplaySession != null)
{
try
{
await _currentReplaySession.StopAsync();
Log.Information("已停止事件回放");
// 输出回放统计
Log.Information("回放统计: 共处理 {Processed}/{Total} 个事件",
_currentReplaySession.ProcessedEvents,
_currentReplaySession.TotalEvents);
TimeSpan duration = (_currentReplaySession.EndTime ?? DateTime.Now) - _currentReplaySession.StartTime;
Log.Information("回放持续时间: {Duration:F1} 秒", duration.TotalSeconds);
}
catch (Exception ex)
{
Log.Error(ex, "停止回放时出错: {Message}", ex.Message);
}
finally
{
_currentReplaySession.Dispose();
_currentReplaySession = null;
}
}
}
// 加载配置文件
private static void LoadConfiguration()
{
try
{
// 检查配置文件是否存在
string configPath = Path.Combine(Directory.GetCurrentDirectory(), "appsettings.json");
if (!File.Exists(configPath))
{
Log.Warning("配置文件不存在,使用默认配置。");
// 使用默认配置
_config = new FileMonitorConfig
{
General = new GeneralConfig
{
EnableFileFiltering = true,
MemoryMonitorIntervalMinutes = 1,
DefaultMonitorPath = Path.Combine(Directory.GetCurrentDirectory(), "MonitorFiles")
},
FileFilters = new FileFiltersConfig
{
AllowedExtensions = new[] { ".dll" },
ExcludedDirectories = new[] { "bin", "obj", "node_modules" },
IncludeSubdirectories = true
},
Performance = new PerformanceConfig
{
MemoryCleanupThreshold = 5000,
ChannelCapacity = 1000,
EventDebounceTimeSeconds = 3,
MaxDictionarySize = 10000,
CleanupIntervalSeconds = 5,
ProcessingDelayMs = 5
},
Robustness = new RobustnessConfig
{
EnableAutoRecovery = true,
WatcherHealthCheckIntervalSeconds = 30,
WatcherTimeoutSeconds = 60,
MaxRestartAttempts = 3,
RestartDelaySeconds = 5,
EnableFileLockDetection = true,
LockedFileStrategy = "Retry",
FileLockRetryCount = 3,
FileLockRetryDelayMs = 500
},
NotifyFilters = new List<string> { "LastWrite", "FileName", "DirectoryName", "CreationTime" },
EventStorage = new EventStorageConfig
{
EnableEventStorage = false,
StorageDirectory = Path.Combine(Directory.GetCurrentDirectory(), "EventStorage"),
EnableEventReplay = false,
ReplaySpeedFactor = 1
},
Logging = new LoggingConfig
{
LogLevel = "Information",
LogFileEventDetails = false,
RetainedLogDays = 30,
LogDirectory = "Logs"
}
};
return;
}
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
var configuration = builder.Build();
// 绑定配置到对象
var config = new FileMonitorConfig();
configuration.GetSection("FileMonitorConfig").Bind(config);
// 更新全局配置
_config = config;
}
catch (Exception ex)
{
Log.Error(ex, "加载配置文件失败: {Message}", ex.Message);
Log.Warning("将使用默认配置。");
// 使用默认配置
_config = new FileMonitorConfig
{
General = new GeneralConfig
{
EnableFileFiltering = true,
MemoryMonitorIntervalMinutes = 1,
DefaultMonitorPath = Path.Combine(Directory.GetCurrentDirectory(), "MonitorFiles")
},
FileFilters = new FileFiltersConfig
{
AllowedExtensions = new[] { ".dll" },
ExcludedDirectories = new[] { "bin", "obj", "node_modules" },
IncludeSubdirectories = true
},
Performance = new PerformanceConfig
{
MemoryCleanupThreshold = 5000,
ChannelCapacity = 1000,
EventDebounceTimeSeconds = 3,
MaxDictionarySize = 10000,
CleanupIntervalSeconds = 5,
ProcessingDelayMs = 5
},
Robustness = new RobustnessConfig
{
EnableAutoRecovery = true,
WatcherHealthCheckIntervalSeconds = 30,
WatcherTimeoutSeconds = 60,
MaxRestartAttempts = 3,
RestartDelaySeconds = 5,
EnableFileLockDetection = true,
LockedFileStrategy = "Retry",
FileLockRetryCount = 3,
FileLockRetryDelayMs = 500
},
NotifyFilters = new List<string> { "LastWrite", "FileName", "DirectoryName", "CreationTime" },
EventStorage = new EventStorageConfig
{
EnableEventStorage = false,
StorageDirectory = Path.Combine(Directory.GetCurrentDirectory(), "EventStorage"),
EnableEventReplay = false,
ReplaySpeedFactor = 1
},
Logging = new LoggingConfig
{
LogLevel = "Information",
LogFileEventDetails = false,
RetainedLogDays = 30,
LogDirectory = "Logs"
}
};
}
}
// 打印当前配置信息
private static void LogConfiguration()
{
Log.Information("当前配置信息:");
Log.Information(" 默认监控路径: {Path}", _config.General.DefaultMonitorPath);
Log.Information(" 文件过滤: {Status}", (_config.General.EnableFileFiltering ? "启用" : "禁用"));
if (_config.General.EnableFileFiltering)
{
Log.Information(" 监控文件类型: {Types}", string.Join(", ", _config.FileFilters.AllowedExtensions));
Log.Information(" 排除目录: {Dirs}", string.Join(", ", _config.FileFilters.ExcludedDirectories));
}
Log.Information(" 内存监控间隔: {Minutes}分钟", _config.General.MemoryMonitorIntervalMinutes);
Log.Information(" 内存清理阈值: 每{Threshold}个事件", _config.Performance.MemoryCleanupThreshold);
Log.Information(" 通道容量: {Capacity}", _config.Performance.ChannelCapacity);
Log.Information(" 事件去抖时间: {Seconds}秒", _config.Performance.EventDebounceTimeSeconds);
Log.Information(" 自动恢复: {Status}", (_config.Robustness.EnableAutoRecovery ? "启用" : "禁用"));
Log.Information(" 文件锁检测: {Status}", (_config.Robustness.EnableFileLockDetection ? "启用" : "禁用"));
if (_config.Robustness.EnableFileLockDetection)
{
Log.Information(" 锁定文件策略: {Strategy}", _config.Robustness.LockedFileStrategy);
}
Log.Information(" 日志级别: {Level}", _config.Logging?.LogLevel ?? "Information");
}
// 检查文件是否应该被处理的方法
private static bool ShouldProcessFile(string filePath)
{
if (!_config.General.EnableFileFiltering)
return true;
if (string.IsNullOrEmpty(filePath))
return false;
// 检查是否在排除目录中
foreach (var excludedDir in _config.FileFilters.ExcludedDirectories)
{
if (filePath.Contains($"{Path.DirectorySeparatorChar}{excludedDir}{Path.DirectorySeparatorChar}", StringComparison.OrdinalIgnoreCase) ||
filePath.EndsWith($"{Path.DirectorySeparatorChar}{excludedDir}", StringComparison.OrdinalIgnoreCase))
{
return false;
}
}
string extension = Path.GetExtension(filePath);
return _config.FileFilters.AllowedExtensions.Contains(extension, StringComparer.OrdinalIgnoreCase);
}
// 添加内存监控方法
private static async Task StartMemoryMonitorAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
// 等待指定时间
await Task.Delay(TimeSpan.FromMinutes(_config.General.MemoryMonitorIntervalMinutes), cancellationToken);
// 获取当前内存使用情况
long currentMemory = GC.GetTotalMemory(false) / (1024 * 1024); // MB
Log.Information("内存使用: {Memory} MB, 已处理事件: {EventCount}", currentMemory, _totalEventsProcessed);
// 强制执行垃圾回收
GC.Collect();
GC.WaitForPendingFinalizers();
// 显示垃圾回收后的内存
long afterGCMemory = GC.GetTotalMemory(true) / (1024 * 1024); // MB
Log.Debug("内存清理后: {Memory} MB (释放: {Released} MB)", afterGCMemory, currentMemory - afterGCMemory);
}
}
catch (OperationCanceledException)
{
// 预期的取消异常
}
}
// 创建文件监控器
private static FileSystemWatcher CreateFileWatcher(string path, ChannelWriter<FileSystemEventArgs> writer, CancellationToken cancellationToken)
{
// 创建一个新的文件系统监控器
var watcher = new FileSystemWatcher(path);
// 从配置设置NotifyFilters
NotifyFilters notifyFilters = NotifyFilters.LastWrite; // 默认值
foreach (var filterName in _config.NotifyFilters)
{
if (Enum.TryParse<NotifyFilters>(filterName, out var filter))
{
notifyFilters |= filter;
}
}
watcher.NotifyFilter = notifyFilters;
watcher.IncludeSubdirectories = _config.FileFilters.IncludeSubdirectories;
watcher.EnableRaisingEvents = true;
// 设置文件过滤器,如果启用了过滤
if (_config.General.EnableFileFiltering && _config.FileFilters.AllowedExtensions.Length > 0)
{
// 在FileSystemWatcher级别设置过滤器这样可以减少系统生成的事件数量
// 只能设置一个扩展名作为Filter所以我们选择第一个
string firstExtension = _config.FileFilters.AllowedExtensions[0];
watcher.Filter = $"*{firstExtension}";
Log.Information("已设置文件系统监控过滤器: *{Filter}", firstExtension);
if (_config.FileFilters.AllowedExtensions.Length > 1)
{
Log.Warning("注意: FileSystemWatcher只支持单一过滤器其他文件类型将在事件处理时过滤。");
}
}
// 注册事件处理程序
watcher.Created += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
watcher.Changed += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
watcher.Deleted += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
watcher.Renamed += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
watcher.Error += (sender, e) =>
{
Log.Error(e.GetException(), "文件监控错误: {Message}", e.GetException().Message);
};
return watcher;
}
// 文件事件处理
private static void HandleFileEvent(object sender, FileSystemEventArgs e, ChannelWriter<FileSystemEventArgs> writer, CancellationToken cancellationToken)
{
// 更新最后事件时间
_lastEventTime = DateTime.UtcNow;
// 如果启用了事件存储,记录事件
_eventStorage?.RecordEvent(e);
try
{
// 如果是退出信号,忽略后续处理
if (cancellationToken.IsCancellationRequested)
{
return;
}
// 只处理指定类型的文件,如果启用了文件过滤
if (!ShouldProcessFile(e.FullPath))
{
return;
}
// 尝试将事件写入通道,如果满了就丢弃
// 不等待,以免阻塞文件系统事件
if (!writer.TryWrite(e))
{
Log.Warning("警告: 事件处理队列已满,部分事件被丢弃");
}
}
catch (Exception ex)
{
// 捕获任何异常,防止崩溃
Log.Error(ex, "处理文件事件时出错: {Message}", ex.Message);
}
}
private static async Task StartFileMonitorAsync(string path, ChannelWriter<FileSystemEventArgs> writer, CancellationToken cancellationToken)
{
// 使用TaskCompletionSource来控制任务的完成
var tcs = new TaskCompletionSource<bool>();
try
{
// 创建并启动监控器
_activeWatcher = CreateFileWatcher(path, writer, cancellationToken);
// 注册取消回调
cancellationToken.Register(() =>
{
try
{
if (_activeWatcher != null)
{
// 确保所有资源正确清理
_activeWatcher.EnableRaisingEvents = false;
_activeWatcher.Dispose();
}
}
catch (Exception ex)
{
Log.Error(ex, "关闭文件监控器时出错: {Message}", ex.Message);
}
finally
{
tcs.TrySetResult(true);
}
});
// 等待任务被取消或完成
await tcs.Task;
}
catch (Exception ex)
{
Log.Error(ex, "启动文件监控器时出错: {Message}", ex.Message);
tcs.TrySetException(ex);
throw;
}
}
private static async Task ProcessEventsAsync(ChannelReader<FileSystemEventArgs> reader, CancellationToken cancellationToken)
{
try
{
// 使用配置的字典大小
var recentlyProcessed = new ConcurrentDictionary<string, DateTime>();
// 清理计时器,频率由配置决定
using var cleanupTimer = new Timer(_ =>
{
try
{
var cutoff = DateTime.UtcNow.AddSeconds(-_config.Performance.EventDebounceTimeSeconds);
int removedCount = 0;
// 限制字典大小
if (recentlyProcessed.Count > _config.Performance.MaxDictionarySize)
{
// 找出最旧的条目删除
var oldestItems = recentlyProcessed
.OrderBy(kv => kv.Value)
.Take(recentlyProcessed.Count - _config.Performance.MaxDictionarySize / 2);
foreach (var item in oldestItems)
{
DateTime dummy;
if (recentlyProcessed.TryRemove(item.Key, out dummy))
{
removedCount++;
}
}
}
// 常规清理
foreach (var key in recentlyProcessed.Keys)
{
if (recentlyProcessed.TryGetValue(key, out var time) && time < cutoff)
{
DateTime dummy;
if (recentlyProcessed.TryRemove(key, out dummy))
{
removedCount++;
}
}
}
if (removedCount > 0)
{
Log.Debug("已清理 {Count} 个过期事件记录", removedCount);
}
}
catch (Exception ex)
{
Log.Error(ex, "清理定时器错误: {Message}", ex.Message);
}
}, null, TimeSpan.FromSeconds(_config.Performance.CleanupIntervalSeconds),
TimeSpan.FromSeconds(_config.Performance.CleanupIntervalSeconds));
// 从通道读取事件并处理
await foreach (var e in reader.ReadAllAsync(cancellationToken))
{
// 再次确认文件扩展名,双重检查
if (!ShouldProcessFile(e.FullPath))
{
continue;
}
// 增加计数器
long count = Interlocked.Increment(ref _totalEventsProcessed);
// 周期性清理内存
if (count % _config.Performance.MemoryCleanupThreshold == 0)
{
// 在后台线程执行垃圾回收,但直接等待完成避免警告
await Task.Run(() =>
{
GC.Collect();
GC.WaitForPendingFinalizers();
});
}
// 只使用文件路径作为键,忽略事件类型,这样同一文件的创建和修改事件会被视为同一事件
var key = e.FullPath;
// 对于创建事件,如果之前有相同文件的其他事件,则跳过
if (e.ChangeType == WatcherChangeTypes.Changed &&
recentlyProcessed.TryGetValue(key, out var _))
{
// 如果是修改事件且文件最近被处理过,则跳过
continue;
}
// 更新或添加文件处理时间,使用原子操作
recentlyProcessed[key] = DateTime.UtcNow;
string eventType = e.ChangeType switch
{
WatcherChangeTypes.Created => "创建",
WatcherChangeTypes.Deleted => "删除",
WatcherChangeTypes.Changed => "修改",
WatcherChangeTypes.Renamed => "重命名",
_ => "未知"
};
string fileName = e.Name ?? Path.GetFileName(e.FullPath);
string directoryName = Path.GetDirectoryName(e.FullPath) ?? string.Empty;
Log.Information("{EventType}: {FileName}", eventType, fileName);
if (_config.Logging.LogFileEventDetails)
{
Log.Debug(" 路径: {Path}", directoryName);
var extension = Path.GetExtension(e.FullPath);
Log.Debug(" 类型: {Extension}", extension);
if (e is RenamedEventArgs renamedEvent)
{
Log.Debug(" 原名称: {OldName}", Path.GetFileName(renamedEvent.OldFullPath));
}
}
// 延迟可配置
await Task.Delay(_config.Performance.ProcessingDelayMs, cancellationToken);
}
}
catch (OperationCanceledException)
{
// 预期的取消异常,正常退出
}
catch (Exception ex)
{
Log.Error(ex, "处理事件时发生错误: {Message}", ex.Message);
}
finally
{
// 确保资源释放
GC.Collect();
GC.WaitForPendingFinalizers();
}
}
// 健康监控任务,监视文件监控器的状态并在需要时重启它
private static async Task StartWatcherHealthMonitorAsync(
string path, ChannelWriter<FileSystemEventArgs> writer, CancellationToken cancellationToken)
{
Log.Information("已启动监控器健康检查,间隔: {Seconds}秒", _config.Robustness.WatcherHealthCheckIntervalSeconds);
try
{
while (!cancellationToken.IsCancellationRequested)
{
// 等待指定的健康检查间隔
await Task.Delay(TimeSpan.FromSeconds(_config.Robustness.WatcherHealthCheckIntervalSeconds),
cancellationToken);
// 如果监控器不健康且重启次数未超过上限,尝试重启
if (_activeWatcher != null &&
!FileWatcherUtils.IsWatcherHealthy(_activeWatcher, _lastEventTime, _config.Robustness) &&
_watcherRestartAttempts < _config.Robustness.MaxRestartAttempts)
{
// 停止当前的监控器
Log.Warning("检测到监控器异常,准备重启...");
try
{
_activeWatcher.EnableRaisingEvents = false;
_activeWatcher.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, "停止异常监控器时出错: {Message}", ex.Message);
}
_watcherRestartAttempts++;
Log.Warning("正在重启监控器,尝试次数: {Current}/{Max}",
_watcherRestartAttempts, _config.Robustness.MaxRestartAttempts);
// 等待指定的重启延迟时间
await Task.Delay(TimeSpan.FromSeconds(_config.Robustness.RestartDelaySeconds),
cancellationToken);
// 创建新的监控器
try
{
_activeWatcher = CreateFileWatcher(path, writer, cancellationToken);
_lastEventTime = DateTime.UtcNow; // 重置最后事件时间
Log.Information("监控器已成功重启");
}
catch (Exception ex)
{
Log.Error(ex, "重启监控器失败: {Message}", ex.Message);
}
}
else if (_watcherRestartAttempts >= _config.Robustness.MaxRestartAttempts)
{
Log.Error("警告: 已达到最大重启次数({Max}),不再尝试重启", _config.Robustness.MaxRestartAttempts);
Log.Warning("请检查文件系统或手动重启程序");
break;
}
}
}
catch (OperationCanceledException)
{
// 预期的取消异常,正常退出
}
catch (Exception ex)
{
Log.Error(ex, "健康监控任务异常: {Message}", ex.Message);
}
}
/// <summary>
/// 运行数据库工具
/// </summary>
private static async Task RunDatabaseUtilityAsync(string[] args)
{
Console.WriteLine("正在启动SQLite数据库管理工具...");
try
{
var dbUtility = new DbUtility();
await dbUtility.ExecuteAsync(args);
}
catch (Exception ex)
{
Console.WriteLine($"运行数据库工具时发生错误: {ex.Message}");
}
}
}
}