From 0efb3dec00010123e303122d898d28c2edc1a02d Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Tue, 22 Apr 2025 21:01:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8D=8F=E8=AE=AE=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=9B=91=E6=8E=A7=E7=A8=8B=E5=BA=8F=EF=BC=8C=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E5=8D=8F=E8=AE=AE=E6=96=87=E4=BB=B6CRUD=E7=9A=84?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=EF=BC=8C=E5=90=8C=E6=97=B6=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=8B=E4=BB=B6=EF=BC=8C=E5=B9=B6=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=87=AA=E5=8A=A8=E9=87=8D=E5=90=AF=E5=92=8C=E5=81=A5?= =?UTF-8?q?=E5=BA=B7=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .dockerignore | 30 + JiShe.CollectBus.sln | 9 + .../Config.cs | 287 +++++ .../DbUtility.cs | 277 ++++ .../Dockerfile | 28 + .../EventDatabaseManager.cs | 481 +++++++ .../EventStorage.cs | 745 +++++++++++ .../FileEvent.cs | 254 ++++ .../FileWatcherUtils.cs | 157 +++ .../JiShe.CollectBus.PluginFileWatcher.csproj | 39 + .../Program.cs | 1110 +++++++++++++++++ .../Properties/launchSettings.json | 10 + .../appsettings.json | 88 ++ .../Consumer/ConsumerService.cs | 1 + .../Abstracts/BaseProtocolPlugin.cs | 13 +- .../CollectBusProtocolModule.cs | 18 + .../Interfaces/IProtocolPlugin.cs | 2 +- .../Interfaces/IProtocolService.cs | 22 + ...JiShe.CollectBus.Protocol.Contracts.csproj | 1 + .../Services/ProtocolService.cs | 52 + .../JiShe.CollectBus.Protocol.Test.csproj | 2 +- ...s => JiSheCollectBusTestProtocolModule.cs} | 6 +- .../TestProtocolPlugin.cs | 131 +- .../JiSheCollectBusProtocolModule.cs | 2 +- .../CollectBusAppService.cs | 2 +- .../CollectBusApplicationModule.cs | 35 +- .../EnergySystem/CacheAppService.cs | 2 +- .../Interceptors/LogInterceptAttribute.cs | 1 + .../Samples/TestAppService.cs | 15 +- .../Subscribers/SubscriberAppService.cs | 7 +- .../IotSystems/CommunicationLogs/PacketLog.cs | 43 + .../IotSystems/Protocols/ProtocolInfo.cs | 6 + .../JiShe.CollectBus.Domain.csproj | 11 +- .../Consts/ExceptionCode.cs | 13 + .../Consts/RedisConst.cs | 7 +- .../Extensions/StringExtensions.cs | 2 +- .../CollectBusHostModule.cs | 3 + .../JiShe.CollectBus.Host.csproj | 3 + 38 files changed, 3733 insertions(+), 182 deletions(-) create mode 100644 .dockerignore create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/Config.cs create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/DbUtility.cs create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/Dockerfile create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/EventDatabaseManager.cs create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/EventStorage.cs create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/FileEvent.cs create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/FileWatcherUtils.cs create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/Program.cs create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/Properties/launchSettings.json create mode 100644 external/JiShe.CollectBus.PluginFileWatcher/appsettings.json create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/CollectBusProtocolModule.cs create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolService.cs create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs rename protocols/JiShe.CollectBus.Protocol.Test/{JiSheCollectBusProtocolModule.cs => JiSheCollectBusTestProtocolModule.cs} (72%) create mode 100644 services/JiShe.CollectBus.Domain/IotSystems/CommunicationLogs/PacketLog.cs create mode 100644 shared/JiShe.CollectBus.Common/Consts/ExceptionCode.cs diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..fe1152b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,30 @@ +**/.classpath +**/.dockerignore +**/.env +**/.git +**/.gitignore +**/.project +**/.settings +**/.toolstarget +**/.vs +**/.vscode +**/*.*proj.user +**/*.dbmdl +**/*.jfm +**/azds.yaml +**/bin +**/charts +**/docker-compose* +**/Dockerfile* +**/node_modules +**/npm-debug.log +**/obj +**/secrets.dev.yaml +**/values.dev.yaml +LICENSE +README.md +!**/.gitignore +!.git/HEAD +!.git/config +!.git/packed-refs +!.git/refs/heads/** \ No newline at end of file diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index fa3fd6c..6bc2bee 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -49,6 +49,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EB EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "6.External", "6.External", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.PluginFileWatcher", "external\JiShe.CollectBus.PluginFileWatcher\JiShe.CollectBus.PluginFileWatcher.csproj", "{F767D1C3-6807-4AE3-996A-3A28FE8124C2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -127,6 +131,10 @@ Global {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU + {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -150,6 +158,7 @@ Global {A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} + {F767D1C3-6807-4AE3-996A-3A28FE8124C2} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Config.cs b/external/JiShe.CollectBus.PluginFileWatcher/Config.cs new file mode 100644 index 0000000..8de55f6 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/Config.cs @@ -0,0 +1,287 @@ +using System; +using System.Collections.Generic; + +namespace JiShe.CollectBus.PluginFileWatcher +{ + /// + /// 文件监控程序的配置类 + /// + public class FileMonitorConfig + { + /// + /// 基本配置 + /// + public GeneralConfig General { get; set; } = new GeneralConfig(); + + /// + /// 文件过滤配置 + /// + public FileFiltersConfig FileFilters { get; set; } = new FileFiltersConfig(); + + /// + /// 性能相关配置 + /// + public PerformanceConfig Performance { get; set; } = new PerformanceConfig(); + + /// + /// 健壮性相关配置 + /// + public RobustnessConfig Robustness { get; set; } = new RobustnessConfig(); + + /// + /// 事件存储和回放配置 + /// + public EventStorageConfig EventStorage { get; set; } = new EventStorageConfig(); + + /// + /// 文件系统通知过滤器配置 + /// + public List NotifyFilters { get; set; } = new List(); + + /// + /// 日志配置 + /// + public LoggingConfig Logging { get; set; } = new LoggingConfig(); + } + + /// + /// 常规配置 + /// + public class GeneralConfig + { + /// + /// 是否启用文件过滤 + /// + public bool EnableFileFiltering { get; set; } = true; + + /// + /// 内存监控间隔(分钟) + /// + public int MemoryMonitorIntervalMinutes { get; set; } = 1; + + /// + /// 默认监控路径 + /// + public string DefaultMonitorPath { get; set; } = string.Empty; + } + + /// + /// 文件过滤配置 + /// + public class FileFiltersConfig + { + /// + /// 允许监控的文件扩展名 + /// + public string[] AllowedExtensions { get; set; } = new[] { ".dll" }; + + /// + /// 排除的目录 + /// + public string[] ExcludedDirectories { get; set; } = new[] { "bin", "obj", "node_modules" }; + + /// + /// 是否包含子目录 + /// + public bool IncludeSubdirectories { get; set; } = true; + } + + /// + /// 性能相关配置 + /// + public class PerformanceConfig + { + /// + /// 内存清理阈值(事件数) + /// + public int MemoryCleanupThreshold { get; set; } = 5000; + + /// + /// 通道容量 + /// + public int ChannelCapacity { get; set; } = 1000; + + /// + /// 事件去抖时间(秒) + /// + public int EventDebounceTimeSeconds { get; set; } = 3; + + /// + /// 最大字典大小 + /// + public int MaxDictionarySize { get; set; } = 10000; + + /// + /// 清理间隔(秒) + /// + public int CleanupIntervalSeconds { get; set; } = 5; + + /// + /// 处理延迟(毫秒) + /// + public int ProcessingDelayMs { get; set; } = 5; + } + + /// + /// 健壮性相关配置 + /// + public class RobustnessConfig + { + /// + /// 是否启用自动恢复机制 + /// + public bool EnableAutoRecovery { get; set; } = true; + + /// + /// 监控器健康检查间隔(秒) + /// + public int WatcherHealthCheckIntervalSeconds { get; set; } = 30; + + /// + /// 监控器无响应超时时间(秒) + /// + public int WatcherTimeoutSeconds { get; set; } = 60; + + /// + /// 监控器重启尝试最大次数 + /// + public int MaxRestartAttempts { get; set; } = 3; + + /// + /// 重启尝试之间的延迟(秒) + /// + public int RestartDelaySeconds { get; set; } = 5; + + /// + /// 是否启用文件锁检测 + /// + public bool EnableFileLockDetection { get; set; } = true; + + /// + /// 对锁定文件的处理策略: Skip(跳过), Retry(重试), Log(仅记录) + /// + public string LockedFileStrategy { get; set; } = "Retry"; + + /// + /// 文件锁定重试次数 + /// + public int FileLockRetryCount { get; set; } = 3; + + /// + /// 文件锁定重试间隔(毫秒) + /// + public int FileLockRetryDelayMs { get; set; } = 500; + } + + /// + /// 事件存储和回放配置 + /// + public class EventStorageConfig + { + /// + /// 是否启用事件存储 + /// + public bool EnableEventStorage { get; set; } = true; + + /// + /// 存储类型:SQLite 或 File + /// + public string StorageType { get; set; } = "SQLite"; + + /// + /// 事件存储目录 + /// + public string StorageDirectory { get; set; } = "D:/EventLogs"; + + /// + /// SQLite数据库文件路径 + /// + public string DatabasePath { get; set; } = "D:/EventLogs/events.db"; + + /// + /// SQLite连接字符串 + /// + public string ConnectionString { get; set; } = "Data Source=D:/EventLogs/events.db"; + + /// + /// 数据库命令超时(秒) + /// + public int CommandTimeout { get; set; } = 30; + + /// + /// 事件日志文件名格式 (使用DateTime.ToString格式) + /// + public string LogFileNameFormat { get; set; } = "FileEvents_{0:yyyy-MM-dd}.json"; + + /// + /// 存储间隔(秒),多久将缓存的事件写入一次存储 + /// + public int StorageIntervalSeconds { get; set; } = 60; + + /// + /// 事件批量写入大小,达到此数量时立即写入存储 + /// + public int BatchSize { get; set; } = 100; + + /// + /// 最大保留事件记录条数 + /// + public int MaxEventRecords { get; set; } = 100000; + + /// + /// 数据保留天数 + /// + public int DataRetentionDays { get; set; } = 30; + + /// + /// 最大保留日志文件数 + /// + public int MaxLogFiles { get; set; } = 30; + + /// + /// 是否压缩存储的事件日志 + /// + public bool CompressLogFiles { get; set; } = true; + + /// + /// 是否可以回放事件 + /// + public bool EnableEventReplay { get; set; } = true; + + /// + /// 回放时间间隔(毫秒) + /// + public int ReplayIntervalMs { get; set; } = 100; + + /// + /// 回放速度倍率,大于1加速,小于1减速 + /// + public double ReplaySpeedFactor { get; set; } = 1.0; + } + + /// + /// 日志相关配置 + /// + public class LoggingConfig + { + /// + /// 日志级别:Verbose、Debug、Information、Warning、Error、Fatal + /// + public string LogLevel { get; set; } = "Information"; + + /// + /// 是否记录文件事件处理详情 + /// + public bool LogFileEventDetails { get; set; } = false; + + /// + /// 日志文件保留天数 + /// + public int RetainedLogDays { get; set; } = 30; + + /// + /// 日志文件目录 + /// + public string LogDirectory { get; set; } = "Logs"; + } +} \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/DbUtility.cs b/external/JiShe.CollectBus.PluginFileWatcher/DbUtility.cs new file mode 100644 index 0000000..c2dd7a5 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/DbUtility.cs @@ -0,0 +1,277 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Serilog; +using Serilog.Extensions.Logging; +using ILogger = Microsoft.Extensions.Logging.ILogger; + +namespace JiShe.CollectBus.PluginFileWatcher +{ + /// + /// 数据库操作工具类,用于命令行测试数据库功能 + /// + public class DbUtility + { + private readonly EventDatabaseManager _dbManager; + private readonly ILogger _logger; + private readonly FileMonitorConfig _config; + + /// + /// 初始化数据库工具类 + /// + /// 配置文件路径 + public DbUtility(string configPath = "appsettings.json") + { + // 从配置文件加载配置 + var configuration = new ConfigurationBuilder() + .AddJsonFile(configPath, optional: false, reloadOnChange: true) + .Build(); + + // 初始化日志 + var serilogLogger = new LoggerConfiguration() + .ReadFrom.Configuration(configuration) + .CreateLogger(); + + // 将Serilog适配为Microsoft.Extensions.Logging.ILogger + _logger = new SerilogLoggerFactory(serilogLogger).CreateLogger("DbUtility"); + + // 创建配置对象 + _config = new FileMonitorConfig(); + configuration.GetSection("FileMonitor").Bind(_config); + + // 确保SQLite存储已启用 + _config.EventStorage.EnableEventStorage = true; + _config.EventStorage.StorageType = "SQLite"; + + // 创建数据库管理器 + _dbManager = new EventDatabaseManager(_config, _logger); + } + + /// + /// 执行数据库维护操作 + /// + /// 命令行参数 + public async Task ExecuteAsync(string[] args) + { + if (args.Length == 0) + { + PrintUsage(); + return; + } + + string command = args[0].ToLower(); + + switch (command) + { + case "stats": + await ShowStatisticsAsync(); + break; + + case "cleanup": + int days = args.Length > 1 && int.TryParse(args[1], out int d) ? d : _config.EventStorage.DataRetentionDays; + await _dbManager.CleanupOldDataAsync(days); + Console.WriteLine($"已清理 {days} 天前的旧数据"); + break; + + case "query": + await QueryEventsAsync(args); + break; + + case "test": + await GenerateTestDataAsync(args); + break; + + default: + Console.WriteLine($"未知命令: {command}"); + PrintUsage(); + break; + } + } + + /// + /// 显示帮助信息 + /// + private void PrintUsage() + { + Console.WriteLine("数据库工具使用方法:"); + Console.WriteLine(" stats - 显示数据库统计信息"); + Console.WriteLine(" cleanup [days] - 清理指定天数之前的数据(默认使用配置值)"); + Console.WriteLine(" query [limit] - 查询最近的事件(默认10条)"); + Console.WriteLine(" query type:X ext:Y - 按类型和扩展名查询事件"); + Console.WriteLine(" test [count] - 生成测试数据(默认100条)"); + } + + /// + /// 显示数据库统计信息 + /// + private async Task ShowStatisticsAsync() + { + try + { + var stats = await _dbManager.GetDatabaseStatsAsync(); + + Console.WriteLine("数据库统计信息:"); + Console.WriteLine($"事件总数: {stats.TotalEvents}"); + Console.WriteLine($"最早事件时间: {stats.OldestEventTime?.ToLocalTime()}"); + Console.WriteLine($"最新事件时间: {stats.NewestEventTime?.ToLocalTime()}"); + + Console.WriteLine("\n事件类型分布:"); + if (stats.EventTypeCounts != null) + { + foreach (var item in stats.EventTypeCounts) + { + Console.WriteLine($" {item.Key}: {item.Value}"); + } + } + + Console.WriteLine("\n扩展名分布 (Top 10):"); + if (stats.TopExtensions != null) + { + foreach (var item in stats.TopExtensions) + { + Console.WriteLine($" {item.Key}: {item.Value}"); + } + } + } + catch (Exception ex) + { + Console.WriteLine($"获取统计信息出错: {ex.Message}"); + _logger.LogError(ex, "获取数据库统计信息时发生错误"); + } + } + + /// + /// 查询事件 + /// + private async Task QueryEventsAsync(string[] args) + { + try + { + var queryParams = new EventQueryParams + { + PageSize = 10, + PageIndex = 0 + }; + + // 解析查询参数 + if (args.Length > 1) + { + foreach (var arg in args.Skip(1)) + { + if (int.TryParse(arg, out int limit)) + { + queryParams.PageSize = limit; + } + else if (arg.StartsWith("type:", StringComparison.OrdinalIgnoreCase)) + { + string typeValue = arg.Substring(5); + if (Enum.TryParse(typeValue, true, out var eventType)) + { + queryParams.EventType = eventType; + } + } + else if (arg.StartsWith("ext:", StringComparison.OrdinalIgnoreCase)) + { + queryParams.ExtensionFilter = arg.Substring(4); + if (!queryParams.ExtensionFilter.StartsWith(".")) + { + queryParams.ExtensionFilter = "." + queryParams.ExtensionFilter; + } + } + else if (arg.StartsWith("path:", StringComparison.OrdinalIgnoreCase)) + { + queryParams.PathFilter = arg.Substring(5); + } + } + } + + // 执行查询 + var result = await _dbManager.QueryEventsAsync(queryParams); + + Console.WriteLine($"查询结果 (总数: {result.TotalCount}):"); + foreach (var evt in result.Events) + { + string typeStr = evt.EventType.ToString(); + string timestamp = evt.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss"); + Console.WriteLine($"[{timestamp}] {typeStr,-10} {evt.FileName} ({evt.Extension})"); + } + + if (result.HasMore) + { + Console.WriteLine($"... 还有更多结果,共 {result.TotalCount} 条"); + } + } + catch (Exception ex) + { + Console.WriteLine($"查询事件出错: {ex.Message}"); + _logger.LogError(ex, "查询事件时发生错误"); + } + } + + /// + /// 生成测试数据 + /// + private async Task GenerateTestDataAsync(string[] args) + { + try + { + int count = args.Length > 1 && int.TryParse(args[1], out int c) ? c : 100; + + var events = new List(); + var rnd = new Random(); + string[] extensions = { ".dll", ".exe", ".txt", ".cs", ".xml", ".json", ".png", ".jpg" }; + string[] directories = { "C:\\Temp", "D:\\Work", "C:\\Program Files", "D:\\Projects", "E:\\Data" }; + + DateTime startTime = DateTime.Now.AddHours(-24); + + for (int i = 0; i < count; i++) + { + var eventType = (FileEventType)rnd.Next(0, 5); + var ext = extensions[rnd.Next(extensions.Length)]; + var dir = directories[rnd.Next(directories.Length)]; + var fileName = $"TestFile_{i:D5}{ext}"; + var timestamp = startTime.AddMinutes(i); + + var fileEvent = new FileEvent + { + Id = Guid.NewGuid(), + Timestamp = timestamp, + EventType = eventType, + FullPath = $"{dir}\\{fileName}", + FileName = fileName, + Directory = dir, + Extension = ext, + FileSize = rnd.Next(1024, 1024 * 1024) + }; + + // 如果是重命名事件,添加旧文件名 + if (eventType == FileEventType.Renamed) + { + fileEvent.OldFileName = $"OldFile_{i:D5}{ext}"; + fileEvent.OldFullPath = $"{dir}\\{fileEvent.OldFileName}"; + } + + // 添加一些元数据 + fileEvent.Metadata["CreationTime"] = timestamp.AddMinutes(-rnd.Next(1, 60)).ToString("o"); + fileEvent.Metadata["LastWriteTime"] = timestamp.ToString("o"); + fileEvent.Metadata["IsReadOnly"] = (rnd.Next(10) < 2).ToString(); + fileEvent.Metadata["TestData"] = $"测试数据 {i}"; + + events.Add(fileEvent); + } + + Console.WriteLine($"正在生成 {count} 条测试数据..."); + await _dbManager.SaveEventsAsync(events); + Console.WriteLine("测试数据生成完成!"); + } + catch (Exception ex) + { + Console.WriteLine($"生成测试数据出错: {ex.Message}"); + _logger.LogError(ex, "生成测试数据时发生错误"); + } + } + } +} \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Dockerfile b/external/JiShe.CollectBus.PluginFileWatcher/Dockerfile new file mode 100644 index 0000000..88a0552 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/Dockerfile @@ -0,0 +1,28 @@ +# 请参阅 https://aka.ms/customizecontainer 以了解如何自定义调试容器,以及 Visual Studio 如何使用此 Dockerfile 生成映像以更快地进行调试。 + +# 此阶段用于在快速模式(默认为调试配置)下从 VS 运行时 +FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base +USER $APP_UID +WORKDIR /app + + +# 此阶段用于生成服务项目 +FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build +ARG BUILD_CONFIGURATION=Release +WORKDIR /src +COPY ["external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj", "external/JiShe.CollectBus.PluginFileWatcher/"] +RUN dotnet restore "./external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj" +COPY . . +WORKDIR "/src/external/JiShe.CollectBus.PluginFileWatcher" +RUN dotnet build "./JiShe.CollectBus.PluginFileWatcher.csproj" -c $BUILD_CONFIGURATION -o /app/build + +# 此阶段用于发布要复制到最终阶段的服务项目 +FROM build AS publish +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "./JiShe.CollectBus.PluginFileWatcher.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +# 此阶段在生产中使用,或在常规模式下从 VS 运行时使用(在不使用调试配置时为默认值) +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "JiShe.CollectBus.PluginFileWatcher.dll"] \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/EventDatabaseManager.cs b/external/JiShe.CollectBus.PluginFileWatcher/EventDatabaseManager.cs new file mode 100644 index 0000000..9b0f257 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/EventDatabaseManager.cs @@ -0,0 +1,481 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Dapper; +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging; + +namespace JiShe.CollectBus.PluginFileWatcher +{ + /// + /// SQLite数据库管理器,用于管理文件事件的存储和检索 + /// + public class EventDatabaseManager : IDisposable + { + private readonly FileMonitorConfig _config; + private readonly ILogger _logger; + private readonly string _connectionString; + private readonly string _databasePath; + private readonly int _commandTimeout; + private bool _disposed; + + /// + /// 初始化数据库管理器 + /// + /// 配置对象 + /// 日志记录器 + public EventDatabaseManager(FileMonitorConfig config, ILogger logger) + { + _config = config ?? throw new ArgumentNullException(nameof(config)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + // 确保使用配置中的设置 + _databasePath = config.EventStorage.DatabasePath; + _connectionString = config.EventStorage.ConnectionString; + _commandTimeout = config.EventStorage.CommandTimeout; + + // 确保数据库目录存在 + string dbDirectory = Path.GetDirectoryName(_databasePath); + if (!string.IsNullOrEmpty(dbDirectory) && !Directory.Exists(dbDirectory)) + { + Directory.CreateDirectory(dbDirectory); + } + + // 初始化数据库 + InitializeDatabase().GetAwaiter().GetResult(); + } + + /// + /// 初始化数据库,确保必要的表已创建 + /// + private async Task InitializeDatabase() + { + try + { + using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + // 启用外键约束 + using (var command = connection.CreateCommand()) + { + command.CommandText = "PRAGMA foreign_keys = ON;"; + await command.ExecuteNonQueryAsync(); + } + + // 创建文件事件表 + string createTableSql = @" + CREATE TABLE IF NOT EXISTS FileEvents ( + Id TEXT PRIMARY KEY, + Timestamp TEXT NOT NULL, + EventType INTEGER NOT NULL, + FullPath TEXT NOT NULL, + FileName TEXT NOT NULL, + Directory TEXT NOT NULL, + Extension TEXT NOT NULL, + OldFileName TEXT, + OldFullPath TEXT, + FileSize INTEGER, + CreatedAt TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_events_timestamp ON FileEvents(Timestamp); + CREATE INDEX IF NOT EXISTS idx_events_eventtype ON FileEvents(EventType); + CREATE INDEX IF NOT EXISTS idx_events_extension ON FileEvents(Extension);"; + + await connection.ExecuteAsync(createTableSql, commandTimeout: _commandTimeout); + + // 创建元数据表 + string createMetadataTableSql = @" + CREATE TABLE IF NOT EXISTS EventMetadata ( + Id INTEGER PRIMARY KEY AUTOINCREMENT, + EventId TEXT NOT NULL, + MetadataKey TEXT NOT NULL, + MetadataValue TEXT, + FOREIGN KEY (EventId) REFERENCES FileEvents(Id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_metadata_eventid ON EventMetadata(EventId); + CREATE INDEX IF NOT EXISTS idx_metadata_key ON EventMetadata(MetadataKey);"; + + await connection.ExecuteAsync(createMetadataTableSql, commandTimeout: _commandTimeout); + + _logger.LogInformation("数据库初始化成功"); + } + catch (Exception ex) + { + _logger.LogError(ex, "初始化数据库失败"); + throw; + } + } + + /// + /// 保存文件事件到数据库 + /// + /// 要保存的事件列表 + public async Task SaveEventsAsync(List events) + { + if (events == null || events.Count == 0) + return; + + try + { + using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + // 启用外键约束 + using (var command = connection.CreateCommand()) + { + command.CommandText = "PRAGMA foreign_keys = ON;"; + await command.ExecuteNonQueryAsync(); + } + + // 开始事务 + using var transaction = connection.BeginTransaction(); + + try + { + foreach (var fileEvent in events) + { + // 插入事件数据 + string insertEventSql = @" + INSERT INTO FileEvents ( + Id, Timestamp, EventType, FullPath, FileName, + Directory, Extension, OldFileName, OldFullPath, + FileSize, CreatedAt + ) VALUES ( + @Id, @Timestamp, @EventType, @FullPath, @FileName, + @Directory, @Extension, @OldFileName, @OldFullPath, + @FileSize, @CreatedAt + )"; + + await connection.ExecuteAsync(insertEventSql, new + { + Id = fileEvent.Id.ToString(), // 确保ID始终以字符串形式保存 + Timestamp = fileEvent.Timestamp.ToString("o"), + EventType = (int)fileEvent.EventType, + fileEvent.FullPath, + fileEvent.FileName, + fileEvent.Directory, + fileEvent.Extension, + fileEvent.OldFileName, + fileEvent.OldFullPath, + fileEvent.FileSize, + CreatedAt = DateTime.UtcNow.ToString("o") + }, transaction, _commandTimeout); + + // 插入元数据 + if (fileEvent.Metadata != null && fileEvent.Metadata.Count > 0) + { + string insertMetadataSql = @" + INSERT INTO EventMetadata (EventId, MetadataKey, MetadataValue) + VALUES (@EventId, @MetadataKey, @MetadataValue)"; + + foreach (var metadata in fileEvent.Metadata) + { + await connection.ExecuteAsync(insertMetadataSql, new + { + EventId = fileEvent.Id.ToString(), // 确保ID以相同格式保存 + MetadataKey = metadata.Key, + MetadataValue = metadata.Value + }, transaction, _commandTimeout); + } + } + } + + // 提交事务 + transaction.Commit(); + _logger.LogInformation($"已成功保存 {events.Count} 个事件到数据库"); + } + catch (Exception ex) + { + // 回滚事务 + transaction.Rollback(); + _logger.LogError(ex, "保存事件到数据库时发生错误"); + throw; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "连接数据库失败"); + throw; + } + } + + /// + /// 查询事件 + /// + /// 查询参数 + /// 查询结果 + public async Task QueryEventsAsync(EventQueryParams queryParams) + { + if (queryParams == null) + throw new ArgumentNullException(nameof(queryParams)); + + var result = new EventQueryResult + { + StartTime = queryParams.StartTime ?? DateTime.MinValue, + EndTime = queryParams.EndTime ?? DateTime.MaxValue + }; + + try + { + using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + // 启用外键约束 + using (var command = connection.CreateCommand()) + { + command.CommandText = "PRAGMA foreign_keys = ON;"; + await command.ExecuteNonQueryAsync(); + } + + // 构建查询条件 + var conditions = new List(); + var parameters = new DynamicParameters(); + + if (queryParams.StartTime.HasValue) + { + conditions.Add("Timestamp >= @StartTime"); + parameters.Add("@StartTime", queryParams.StartTime.Value.ToString("o")); + } + + if (queryParams.EndTime.HasValue) + { + conditions.Add("Timestamp <= @EndTime"); + parameters.Add("@EndTime", queryParams.EndTime.Value.ToString("o")); + } + + if (queryParams.EventType.HasValue) + { + conditions.Add("EventType = @EventType"); + parameters.Add("@EventType", (int)queryParams.EventType.Value); + } + + if (!string.IsNullOrEmpty(queryParams.PathFilter)) + { + conditions.Add("FullPath LIKE @PathFilter"); + parameters.Add("@PathFilter", $"%{queryParams.PathFilter}%"); + } + + if (!string.IsNullOrEmpty(queryParams.ExtensionFilter)) + { + conditions.Add("Extension = @ExtensionFilter"); + parameters.Add("@ExtensionFilter", queryParams.ExtensionFilter); + } + + // 构建WHERE子句 + string whereClause = conditions.Count > 0 + ? $"WHERE {string.Join(" AND ", conditions)}" + : string.Empty; + + // 构建ORDER BY子句 + string orderByClause = queryParams.AscendingOrder + ? "ORDER BY Timestamp ASC" + : "ORDER BY Timestamp DESC"; + + // 获取总记录数 + string countSql = $"SELECT COUNT(*) FROM FileEvents {whereClause}"; + result.TotalCount = await connection.ExecuteScalarAsync(countSql, parameters, commandTimeout: _commandTimeout); + + // 应用分页 + string paginationClause = $"LIMIT @PageSize OFFSET @Offset"; + parameters.Add("@PageSize", queryParams.PageSize); + parameters.Add("@Offset", queryParams.PageIndex * queryParams.PageSize); + + // 查询事件数据 + string querySql = $@" + SELECT Id, + Timestamp, + EventType, + FullPath, + FileName, + Directory, + Extension, + OldFileName, + OldFullPath, + FileSize + FROM FileEvents + {whereClause} + {orderByClause} + {paginationClause}"; + + var events = await connection.QueryAsync(querySql, parameters, commandTimeout: _commandTimeout); + + // 处理查询结果 + foreach (var eventData in events) + { + var fileEvent = new FileEvent + { + Id = Guid.Parse(eventData.Id), + Timestamp = DateTime.Parse(eventData.Timestamp), + EventType = (FileEventType)eventData.EventType, + FullPath = eventData.FullPath, + FileName = eventData.FileName, + Directory = eventData.Directory, + Extension = eventData.Extension, + OldFileName = eventData.OldFileName, + OldFullPath = eventData.OldFullPath, + FileSize = eventData.FileSize + }; + + // 获取元数据 + string metadataSql = "SELECT MetadataKey, MetadataValue FROM EventMetadata WHERE EventId = @EventId"; + var metadata = await connection.QueryAsync(metadataSql, new { EventId = fileEvent.Id.ToString() }, commandTimeout: _commandTimeout); + + foreach (var item in metadata) + { + fileEvent.Metadata[item.MetadataKey] = item.MetadataValue; + } + + result.Events.Add(fileEvent); + } + + result.HasMore = (queryParams.PageIndex + 1) * queryParams.PageSize < result.TotalCount; + + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, "查询事件时发生错误"); + throw; + } + } + + /// + /// 清理旧数据 + /// + /// 数据保留天数 + public async Task CleanupOldDataAsync(int retentionDays) + { + if (retentionDays <= 0) + return; + + try + { + DateTime cutoffDate = DateTime.UtcNow.AddDays(-retentionDays); + string cutoffDateStr = cutoffDate.ToString("o"); + + using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + // 启用外键约束 + using (var command = connection.CreateCommand()) + { + command.CommandText = "PRAGMA foreign_keys = ON;"; + await command.ExecuteNonQueryAsync(); + } + + // 删除旧事件(级联删除元数据) + string deleteSql = "DELETE FROM FileEvents WHERE Timestamp < @CutoffDate"; + int deletedCount = await connection.ExecuteAsync(deleteSql, new { CutoffDate = cutoffDateStr }, commandTimeout: _commandTimeout); + + _logger.LogInformation($"已清理 {deletedCount} 条旧事件数据({retentionDays}天前)"); + } + catch (Exception ex) + { + _logger.LogError(ex, "清理旧数据时发生错误"); + throw; + } + } + + /// + /// 获取数据库统计信息 + /// + /// 数据库统计信息 + public async Task GetDatabaseStatsAsync() + { + try + { + using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + // 启用外键约束 + using (var command = connection.CreateCommand()) + { + command.CommandText = "PRAGMA foreign_keys = ON;"; + await command.ExecuteNonQueryAsync(); + } + + var stats = new DatabaseStats(); + + // 获取事件总数 + stats.TotalEvents = await connection.ExecuteScalarAsync("SELECT COUNT(*) FROM FileEvents", commandTimeout: _commandTimeout); + + // 获取最早和最新事件时间 + stats.OldestEventTime = await connection.ExecuteScalarAsync("SELECT Timestamp FROM FileEvents ORDER BY Timestamp ASC LIMIT 1", commandTimeout: _commandTimeout); + stats.NewestEventTime = await connection.ExecuteScalarAsync("SELECT Timestamp FROM FileEvents ORDER BY Timestamp DESC LIMIT 1", commandTimeout: _commandTimeout); + + // 获取事件类型分布 + var eventTypeCounts = await connection.QueryAsync("SELECT EventType, COUNT(*) AS Count FROM FileEvents GROUP BY EventType", commandTimeout: _commandTimeout); + stats.EventTypeCounts = new Dictionary(); + + foreach (var item in eventTypeCounts) + { + stats.EventTypeCounts[(FileEventType)item.EventType] = item.Count; + } + + // 获取扩展名分布(前10个) + var extensionCounts = await connection.QueryAsync( + "SELECT Extension, COUNT(*) AS Count FROM FileEvents GROUP BY Extension ORDER BY Count DESC LIMIT 10", + commandTimeout: _commandTimeout); + stats.TopExtensions = new Dictionary(); + + foreach (var item in extensionCounts) + { + stats.TopExtensions[item.Extension] = item.Count; + } + + return stats; + } + catch (Exception ex) + { + _logger.LogError(ex, "获取数据库统计信息时发生错误"); + throw; + } + } + + /// + /// 释放资源 + /// + public void Dispose() + { + if (_disposed) + return; + + _disposed = true; + } + } + + /// + /// 数据库统计信息 + /// + public class DatabaseStats + { + /// + /// 事件总数 + /// + public int TotalEvents { get; set; } + + /// + /// 最早事件时间 + /// + public DateTime? OldestEventTime { get; set; } + + /// + /// 最新事件时间 + /// + public DateTime? NewestEventTime { get; set; } + + /// + /// 事件类型计数 + /// + public Dictionary EventTypeCounts { get; set; } + + /// + /// 排名前列的文件扩展名 + /// + public Dictionary TopExtensions { get; set; } + } +} \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/EventStorage.cs b/external/JiShe.CollectBus.PluginFileWatcher/EventStorage.cs new file mode 100644 index 0000000..9bbf619 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/EventStorage.cs @@ -0,0 +1,745 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace JiShe.CollectBus.PluginFileWatcher +{ + /// + /// 负责文件事件的存储、查询和回放 + /// + public class EventStorage : IDisposable + { + private readonly FileMonitorConfig _config; + private readonly ILogger _logger; + private readonly ConcurrentQueue _eventQueue; + private readonly Timer _storageTimer; + private readonly SemaphoreSlim _storageLock = new SemaphoreSlim(1, 1); + private readonly string _storageDirectory; + private readonly EventDatabaseManager _dbManager; + private bool _disposed; + + /// + /// 创建新的事件存储管理器实例 + /// + /// 文件监控配置 + /// 日志记录器 + public EventStorage(FileMonitorConfig config, ILogger logger) + { + _config = config ?? throw new ArgumentNullException(nameof(config)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _eventQueue = new ConcurrentQueue(); + + // 确保存储目录存在 + _storageDirectory = !string.IsNullOrEmpty(_config.EventStorage.StorageDirectory) + ? _config.EventStorage.StorageDirectory + : Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "EventLogs"); + + if (!Directory.Exists(_storageDirectory)) + { + Directory.CreateDirectory(_storageDirectory); + } + + // 创建数据库管理器(如果配置为SQLite存储类型) + if (config.EventStorage.EnableEventStorage && + config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase)) + { + _dbManager = new EventDatabaseManager(config, logger); + _logger.LogInformation("已初始化SQLite事件存储"); + } + + // 初始化存储定时器(如果启用) + if (_config.EventStorage.EnableEventStorage) + { + var intervalMs = _config.EventStorage.StorageIntervalSeconds * 1000; + _storageTimer = new Timer(SaveEventsTimerCallback, null, intervalMs, intervalMs); + _logger.LogInformation($"事件存储已初始化,存储目录:{_storageDirectory},存储间隔:{_config.EventStorage.StorageIntervalSeconds}秒"); + } + } + + /// + /// 记录一个文件事件 + /// + /// 文件事件 + public void RecordEvent(FileEvent fileEvent) + { + if (fileEvent == null || !_config.EventStorage.EnableEventStorage) return; + + _eventQueue.Enqueue(fileEvent); + _logger.LogDebug($"文件事件已加入队列:{fileEvent.EventType} - {fileEvent.FullPath}"); + } + + /// + /// 从FileSystemEventArgs记录事件 + /// + /// 文件系统事件参数 + public void RecordEvent(FileSystemEventArgs e) + { + if (e == null || !_config.EventStorage.EnableEventStorage) return; + + var fileEvent = FileEvent.FromFileSystemEventArgs(e); + RecordEvent(fileEvent); + } + + /// + /// 定时将事件保存到文件 + /// + private async void SaveEventsTimerCallback(object state) + { + if (_disposed || _eventQueue.IsEmpty) return; + + try + { + // 防止多个定时器回调同时执行 + if (!await _storageLock.WaitAsync(0)) + { + return; + } + + try + { + // 从队列中取出事件 + List eventsToSave = new List(); + int batchSize = _config.EventStorage.BatchSize; + + while (eventsToSave.Count < batchSize && !_eventQueue.IsEmpty) + { + if (_eventQueue.TryDequeue(out var fileEvent)) + { + eventsToSave.Add(fileEvent); + } + } + + if (eventsToSave.Count > 0) + { + await SaveEventsToFileAsync(eventsToSave); + _logger.LogInformation($"已成功保存 {eventsToSave.Count} 个事件"); + } + + // 如果有配置,清理旧日志文件 + if (_config.EventStorage.MaxLogFiles > 0) + { + await CleanupOldLogFilesAsync(); + } + + // 如果有配置,清理旧数据库记录 + if (_dbManager != null && _config.EventStorage.DataRetentionDays > 0) + { + await _dbManager.CleanupOldDataAsync(_config.EventStorage.DataRetentionDays); + } + } + finally + { + _storageLock.Release(); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "保存事件时发生错误"); + } + } + + /// + /// 将事件保存到文件 + /// + /// 要保存的事件列表 + private async Task SaveEventsToFileAsync(List events) + { + if (events == null || events.Count == 0) return; + + try + { + // 根据存储类型选择保存方式 + if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null) + { + // 保存到SQLite数据库 + await _dbManager.SaveEventsAsync(events); + } + else + { + // 保存到文件 + string fileName = string.Format( + _config.EventStorage.LogFileNameFormat, + DateTime.Now); + + string filePath = Path.Combine(_storageDirectory, fileName); + + // 创建事件日志文件对象 + var logFile = new EventLogFile + { + CreatedTime = DateTime.UtcNow, + Events = events + }; + + // 序列化为JSON + string jsonContent = JsonSerializer.Serialize(logFile, new JsonSerializerOptions + { + WriteIndented = true + }); + + // 是否启用压缩 + if (_config.EventStorage.CompressLogFiles) + { + string gzFilePath = $"{filePath}.gz"; + await CompressAndSaveStringAsync(jsonContent, gzFilePath); + _logger.LogInformation($"已将事件保存到压缩文件:{gzFilePath}"); + } + else + { + await File.WriteAllTextAsync(filePath, jsonContent); + _logger.LogInformation($"已将事件保存到文件:{filePath}"); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "保存事件到文件时发生错误"); + throw; + } + } + + /// + /// 压缩并保存字符串到文件 + /// + /// 要保存的内容 + /// 文件路径 + private static async Task CompressAndSaveStringAsync(string content, string filePath) + { + using var fileStream = new FileStream(filePath, FileMode.Create); + using var gzipStream = new GZipStream(fileStream, CompressionLevel.Optimal); + using var writer = new StreamWriter(gzipStream); + + await writer.WriteAsync(content); + } + + /// + /// 清理过多的日志文件 + /// + private async Task CleanupOldLogFilesAsync() + { + try + { + // 检查是否需要清理 + if (_config.EventStorage.MaxLogFiles <= 0) return; + + var directory = new DirectoryInfo(_storageDirectory); + var logFiles = directory.GetFiles("*.*") + .Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz")) + .OrderByDescending(f => f.CreationTime) + .ToArray(); + + // 如果文件数量超过最大值,删除最旧的文件 + if (logFiles.Length > _config.EventStorage.MaxLogFiles) + { + int filesToDelete = logFiles.Length - _config.EventStorage.MaxLogFiles; + var filesToRemove = logFiles.Skip(logFiles.Length - filesToDelete).ToArray(); + + foreach (var file in filesToRemove) + { + try + { + file.Delete(); + _logger.LogInformation($"已删除旧的事件日志文件:{file.Name}"); + } + catch (Exception ex) + { + _logger.LogWarning(ex, $"删除旧日志文件失败:{file.FullName}"); + } + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "清理旧日志文件时发生错误"); + } + + await Task.CompletedTask; + } + + /// + /// 查询历史事件 + /// + /// 查询参数 + /// 查询结果 + public async Task QueryEventsAsync(EventQueryParams queryParams) + { + if (queryParams == null) throw new ArgumentNullException(nameof(queryParams)); + + // 如果是SQLite存储且数据库管理器可用,使用数据库查询 + if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null) + { + return await _dbManager.QueryEventsAsync(queryParams); + } + + var result = new EventQueryResult + { + StartTime = queryParams.StartTime ?? DateTime.MinValue, + EndTime = queryParams.EndTime ?? DateTime.MaxValue + }; + + try + { + await _storageLock.WaitAsync(); + + try + { + // 获取所有日志文件 + var directory = new DirectoryInfo(_storageDirectory); + if (!directory.Exists) + { + return result; + } + + var logFiles = directory.GetFiles("*.*") + .Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz")) + .OrderByDescending(f => f.CreationTime) + .ToArray(); + + List allEvents = new List(); + + // 加载所有日志文件中的事件 + foreach (var file in logFiles) + { + try + { + var events = await LoadEventsFromFileAsync(file.FullName); + if (events != null && events.Count > 0) + { + allEvents.AddRange(events); + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, $"从文件加载事件失败:{file.FullName}"); + } + } + + // 内存中队列的事件也包含在查询中 + FileEvent[] queuedEvents = _eventQueue.ToArray(); + allEvents.AddRange(queuedEvents); + + // 应用查询过滤条件 + var filteredEvents = allEvents + .Where(e => (queryParams.StartTime == null || e.Timestamp >= queryParams.StartTime) && + (queryParams.EndTime == null || e.Timestamp <= queryParams.EndTime) && + (queryParams.EventType == null || e.EventType == queryParams.EventType.Value) && + (string.IsNullOrEmpty(queryParams.PathFilter) || e.FullPath.Contains(queryParams.PathFilter, StringComparison.OrdinalIgnoreCase)) && + (string.IsNullOrEmpty(queryParams.ExtensionFilter) || e.Extension.Equals(queryParams.ExtensionFilter, StringComparison.OrdinalIgnoreCase))) + .ToList(); + + // 应用排序 + IEnumerable orderedEvents = queryParams.AscendingOrder + ? filteredEvents.OrderBy(e => e.Timestamp) + : filteredEvents.OrderByDescending(e => e.Timestamp); + + // 计算总数 + result.TotalCount = filteredEvents.Count; + + // 应用分页 + int skip = queryParams.PageIndex * queryParams.PageSize; + int take = queryParams.PageSize; + + result.Events = orderedEvents.Skip(skip).Take(take).ToList(); + result.HasMore = (skip + take) < result.TotalCount; + + return result; + } + finally + { + _storageLock.Release(); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "查询事件时发生错误"); + throw; + } + } + + /// + /// 从文件加载事件 + /// + /// 文件路径 + /// 事件列表 + private async Task> LoadEventsFromFileAsync(string filePath) + { + if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath)) + { + return new List(); + } + + try + { + string jsonContent; + + // 处理压缩文件 + if (filePath.EndsWith(".gz")) + { + using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read); + using var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress); + using var reader = new StreamReader(gzipStream); + + jsonContent = await reader.ReadToEndAsync(); + } + else + { + jsonContent = await File.ReadAllTextAsync(filePath); + } + + var logFile = JsonSerializer.Deserialize(jsonContent); + return logFile?.Events ?? new List(); + } + catch (Exception ex) + { + _logger.LogError(ex, $"从文件加载事件失败:{filePath}"); + return new List(); + } + } + + /// + /// 启动事件回放会话 + /// + /// 查询参数,定义要回放的事件 + /// 回放处理回调 + /// 取消标记 + /// 回放会话控制器 + public async Task StartReplayAsync( + EventQueryParams queryParams, + Func replayHandler, + CancellationToken cancellationToken = default) + { + if (replayHandler == null) throw new ArgumentNullException(nameof(replayHandler)); + + // 查询要回放的事件 + var queryResult = await QueryEventsAsync(queryParams); + + // 创建并启动回放会话 + var session = new EventReplaySession( + queryResult.Events, + replayHandler, + _config.EventStorage.ReplayIntervalMs, + _config.EventStorage.ReplaySpeedFactor, + _logger, + cancellationToken); + + await session.StartAsync(); + return session; + } + + /// + /// 释放资源 + /// + public void Dispose() + { + if (_disposed) return; + + _disposed = true; + _storageTimer?.Dispose(); + _storageLock?.Dispose(); + _dbManager?.Dispose(); + + // 尝试保存剩余事件 + if (_config.EventStorage.EnableEventStorage && !_eventQueue.IsEmpty) + { + var remainingEvents = new List(); + while (!_eventQueue.IsEmpty && _eventQueue.TryDequeue(out var evt)) + { + remainingEvents.Add(evt); + } + + if (remainingEvents.Count > 0) + { + SaveEventsToFileAsync(remainingEvents).GetAwaiter().GetResult(); + } + } + + GC.SuppressFinalize(this); + } + } + + /// + /// 事件回放会话 + /// + public class EventReplaySession : IDisposable + { + private readonly List _events; + private readonly Func _replayHandler; + private readonly int _replayIntervalMs; + private readonly double _speedFactor; + private readonly ILogger _logger; + private readonly CancellationToken _cancellationToken; + private CancellationTokenSource _linkedCts; + private Task _replayTask; + private bool _disposed; + private bool _isPaused; + private readonly SemaphoreSlim _pauseSemaphore = new SemaphoreSlim(1, 1); + + /// + /// 回放进度(0-100) + /// + public int Progress { get; private set; } + + /// + /// 当前回放的事件索引 + /// + public int CurrentIndex { get; private set; } + + /// + /// 事件总数 + /// + public int TotalEvents => _events?.Count ?? 0; + + /// + /// 回放是否已完成 + /// + public bool IsCompleted { get; private set; } + + /// + /// 回放是否已暂停 + /// + public bool IsPaused => _isPaused; + + /// + /// 回放已处理的事件数 + /// + public int ProcessedEvents { get; private set; } + + /// + /// 回放开始时间 + /// + public DateTime StartTime { get; private set; } + + /// + /// 回放结束时间(如果已完成) + /// + public DateTime? EndTime { get; private set; } + + /// + /// 创建新的事件回放会话 + /// + /// 要回放的事件 + /// 回放处理回调 + /// 回放间隔(毫秒) + /// 速度因子 + /// 日志记录器 + /// 取消标记 + public EventReplaySession( + List events, + Func replayHandler, + int replayIntervalMs, + double speedFactor, + ILogger logger, + CancellationToken cancellationToken = default) + { + _events = events ?? throw new ArgumentNullException(nameof(events)); + _replayHandler = replayHandler ?? throw new ArgumentNullException(nameof(replayHandler)); + _replayIntervalMs = Math.Max(1, replayIntervalMs); + _speedFactor = Math.Max(0.1, speedFactor); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _cancellationToken = cancellationToken; + + _linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + } + + /// + /// 启动回放 + /// + public async Task StartAsync() + { + if (_replayTask != null) return; + + StartTime = DateTime.Now; + _replayTask = Task.Run(ReplayEventsAsync, _linkedCts.Token); + await Task.CompletedTask; + } + + /// + /// 暂停回放 + /// + public async Task PauseAsync() + { + if (_isPaused || IsCompleted) return; + + await _pauseSemaphore.WaitAsync(); + try + { + _isPaused = true; + } + finally + { + _pauseSemaphore.Release(); + } + + _logger.LogInformation("事件回放已暂停"); + } + + /// + /// 恢复回放 + /// + public async Task ResumeAsync() + { + if (!_isPaused || IsCompleted) return; + + await _pauseSemaphore.WaitAsync(); + try + { + _isPaused = false; + // 释放信号量以允许回放任务继续 + _pauseSemaphore.Release(); + } + catch + { + _pauseSemaphore.Release(); + throw; + } + + _logger.LogInformation("事件回放已恢复"); + } + + /// + /// 停止回放 + /// + public async Task StopAsync() + { + if (IsCompleted) return; + + try + { + // 取消回放任务 + _linkedCts?.Cancel(); + + // 如果暂停中,先恢复以允许取消 + if (_isPaused) + { + await ResumeAsync(); + } + + // 等待任务完成 + if (_replayTask != null) + { + await Task.WhenAny(_replayTask, Task.Delay(1000)); + } + + IsCompleted = true; + EndTime = DateTime.Now; + + _logger.LogInformation("事件回放已手动停止"); + } + catch (Exception ex) + { + _logger.LogError(ex, "停止事件回放时发生错误"); + } + } + + /// + /// 回放事件处理 + /// + private async Task ReplayEventsAsync() + { + try + { + _logger.LogInformation($"开始回放{_events.Count}个事件,速度因子:{_speedFactor}"); + + if (_events.Count == 0) + { + IsCompleted = true; + EndTime = DateTime.Now; + return; + } + + DateTime? lastEventTime = null; + + for (int i = 0; i < _events.Count; i++) + { + // 检查是否取消 + if (_linkedCts.Token.IsCancellationRequested) + { + _logger.LogInformation("事件回放已取消"); + break; + } + + // 检查暂停状态 + if (_isPaused) + { + // 等待恢复信号 + await _pauseSemaphore.WaitAsync(_linkedCts.Token); + _pauseSemaphore.Release(); + } + + var currentEvent = _events[i]; + CurrentIndex = i; + + // 计算等待时间(根据事件之间的实际时间差和速度因子) + if (lastEventTime.HasValue && i > 0) + { + var actualTimeDiff = currentEvent.Timestamp - lastEventTime.Value; + var waitTimeMs = (int)(actualTimeDiff.TotalMilliseconds / _speedFactor); + + // 应用最小等待时间 + waitTimeMs = Math.Max(_replayIntervalMs, waitTimeMs); + + // 等待指定时间 + await Task.Delay(waitTimeMs, _linkedCts.Token); + } + + // 处理当前事件 + try + { + await _replayHandler(currentEvent); + ProcessedEvents++; + + // 更新进度 + Progress = (int)((i + 1) * 100.0 / _events.Count); + } + catch (Exception ex) + { + _logger.LogError(ex, $"处理回放事件时发生错误:{currentEvent.EventType} - {currentEvent.FullPath}"); + } + + lastEventTime = currentEvent.Timestamp; + } + + // 完成回放 + IsCompleted = true; + Progress = 100; + EndTime = DateTime.Now; + + _logger.LogInformation($"事件回放已完成,共处理{ProcessedEvents}个事件,耗时:{(EndTime.Value - StartTime).TotalSeconds:F2}秒"); + } + catch (OperationCanceledException) + { + _logger.LogInformation("事件回放已取消"); + IsCompleted = true; + EndTime = DateTime.Now; + } + catch (Exception ex) + { + _logger.LogError(ex, "事件回放过程中发生错误"); + IsCompleted = true; + EndTime = DateTime.Now; + } + } + + /// + /// 释放资源 + /// + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + _linkedCts?.Cancel(); + _linkedCts?.Dispose(); + _pauseSemaphore?.Dispose(); + + GC.SuppressFinalize(this); + } + } +} \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/FileEvent.cs b/external/JiShe.CollectBus.PluginFileWatcher/FileEvent.cs new file mode 100644 index 0000000..4e8d905 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/FileEvent.cs @@ -0,0 +1,254 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text.Json.Serialization; + +namespace JiShe.CollectBus.PluginFileWatcher +{ + /// + /// 表示一个文件系统事件的数据模型,用于序列化和存储 + /// + public class FileEvent + { + /// + /// 事件唯一标识 + /// + public Guid Id { get; set; } = Guid.NewGuid(); + + /// + /// 事件发生时间 + /// + public DateTime Timestamp { get; set; } = DateTime.UtcNow; + + /// + /// 事件类型 + /// + public FileEventType EventType { get; set; } + + /// + /// 文件完整路径 + /// + public string FullPath { get; set; } + + /// + /// 文件名 + /// + public string FileName { get; set; } + + /// + /// 文件所在目录 + /// + public string Directory { get; set; } + + /// + /// 文件扩展名 + /// + public string Extension { get; set; } + + /// + /// 重命名前的旧文件名(仅在重命名事件中有效) + /// + public string OldFileName { get; set; } + + /// + /// 重命名前的旧路径(仅在重命名事件中有效) + /// + public string OldFullPath { get; set; } + + /// + /// 文件大小(字节),如果可获取 + /// + public long? FileSize { get; set; } + + /// + /// 自定义属性,可用于存储其他元数据 + /// + public Dictionary Metadata { get; set; } = new Dictionary(); + + /// + /// 从FileSystemEventArgs创建FileEvent + /// + /// FileSystemEventArgs参数 + /// FileEvent对象 + public static FileEvent FromFileSystemEventArgs(FileSystemEventArgs e) + { + var fileEvent = new FileEvent + { + EventType = GetEventType(e.ChangeType), + FullPath = e.FullPath, + FileName = e.Name ?? Path.GetFileName(e.FullPath), + Directory = Path.GetDirectoryName(e.FullPath), + Extension = Path.GetExtension(e.FullPath) + }; + + // 如果是重命名事件,添加旧文件名信息 + if (e is RenamedEventArgs renamedEvent) + { + fileEvent.OldFileName = Path.GetFileName(renamedEvent.OldFullPath); + fileEvent.OldFullPath = renamedEvent.OldFullPath; + } + + // 尝试获取文件大小(如果文件存在且可访问) + try + { + if (File.Exists(e.FullPath) && e.ChangeType != WatcherChangeTypes.Deleted) + { + var fileInfo = new FileInfo(e.FullPath); + fileEvent.FileSize = fileInfo.Length; + + // 添加一些额外的元数据 + fileEvent.Metadata["CreationTime"] = fileInfo.CreationTime.ToString("o"); + fileEvent.Metadata["LastWriteTime"] = fileInfo.LastWriteTime.ToString("o"); + fileEvent.Metadata["IsReadOnly"] = fileInfo.IsReadOnly.ToString(); + } + } + catch + { + // 忽略任何获取文件信息时的错误 + } + + return fileEvent; + } + + /// + /// 将WatcherChangeTypes转换为FileEventType + /// + /// WatcherChangeTypes枚举值 + /// 对应的FileEventType + public static FileEventType GetEventType(WatcherChangeTypes changeType) + { + return changeType switch + { + WatcherChangeTypes.Created => FileEventType.Created, + WatcherChangeTypes.Deleted => FileEventType.Deleted, + WatcherChangeTypes.Changed => FileEventType.Modified, + WatcherChangeTypes.Renamed => FileEventType.Renamed, + _ => FileEventType.Other + }; + } + } + + /// + /// 文件事件类型 + /// + public enum FileEventType + { + /// + /// 文件被创建 + /// + Created, + + /// + /// 文件被修改 + /// + Modified, + + /// + /// 文件被删除 + /// + Deleted, + + /// + /// 文件被重命名 + /// + Renamed, + + /// + /// 其他类型事件 + /// + Other + } + + /// + /// 表示一个事件日志文件 + /// + public class EventLogFile + { + /// + /// 日志文件创建时间 + /// + public DateTime CreatedTime { get; set; } = DateTime.UtcNow; + + /// + /// 日志文件包含的事件列表 + /// + public List Events { get; set; } = new List(); + } + + /// + /// 事件查询结果 + /// + public class EventQueryResult + { + /// + /// 查询到的事件列表 + /// + public List Events { get; set; } = new List(); + + /// + /// 匹配的事件总数 + /// + public int TotalCount { get; set; } + + /// + /// 查询是否有更多结果 + /// + public bool HasMore { get; set; } + + /// + /// 查询时间范围的开始时间 + /// + public DateTime StartTime { get; set; } + + /// + /// 查询时间范围的结束时间 + /// + public DateTime EndTime { get; set; } + } + + /// + /// 事件查询参数 + /// + public class EventQueryParams + { + /// + /// 查询开始时间 + /// + public DateTime? StartTime { get; set; } + + /// + /// 查询结束时间 + /// + public DateTime? EndTime { get; set; } + + /// + /// 事件类型过滤 + /// + public FileEventType? EventType { get; set; } + + /// + /// 文件路径过滤(支持包含关系) + /// + public string PathFilter { get; set; } + + /// + /// 文件扩展名过滤 + /// + public string ExtensionFilter { get; set; } + + /// + /// 分页大小 + /// + public int PageSize { get; set; } = 100; + + /// + /// 分页索引(从0开始) + /// + public int PageIndex { get; set; } = 0; + + /// + /// 排序方向,true为升序,false为降序 + /// + public bool AscendingOrder { get; set; } = false; + } +} \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/FileWatcherUtils.cs b/external/JiShe.CollectBus.PluginFileWatcher/FileWatcherUtils.cs new file mode 100644 index 0000000..aeb7f90 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/FileWatcherUtils.cs @@ -0,0 +1,157 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.PluginFileWatcher +{ + /// + /// 文件监控相关工具类 + /// + public static class FileWatcherUtils + { + /// + /// 检测文件是否被锁定 + /// + /// 要检查的文件路径 + /// 如果文件被锁定则返回true,否则返回false + public static bool IsFileLocked(string filePath) + { + if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath)) + return false; + + try + { + using (FileStream stream = File.Open(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None)) + { + // 文件可以被完全访问,没有被锁定 + stream.Close(); + } + return false; + } + catch (IOException) + { + // 文件被锁定或正在被其他进程使用 + return true; + } + catch (Exception) + { + // 其他错误(权限不足、路径无效等) + return true; + } + } + + /// + /// 尝试处理一个可能被锁定的文件 + /// + /// 文件路径 + /// 成功解锁后要执行的操作 + /// 健壮性配置 + /// 处理结果:true表示成功处理,false表示处理失败 + public static async Task TryHandleLockedFileAsync(string filePath, Func action, RobustnessConfig config) + { + if (!config.EnableFileLockDetection) + { + // 如果禁用了锁检测,直接执行操作 + try + { + await action(); + return true; + } + catch + { + return false; + } + } + + // 如果文件不存在或不是锁定状态,直接执行操作 + if (!File.Exists(filePath) || !IsFileLocked(filePath)) + { + try + { + await action(); + return true; + } + catch + { + return false; + } + } + + // 文件被锁定,根据策略处理 + switch (config.LockedFileStrategy.ToLower()) + { + case "skip": + // 跳过这个文件 + Console.WriteLine($"文件被锁定,已跳过: {filePath}"); + return false; + + case "retry": + // 重试几次 + for (int i = 0; i < config.FileLockRetryCount; i++) + { + await Task.Delay(config.FileLockRetryDelayMs); + + if (!IsFileLocked(filePath)) + { + try + { + await action(); + Console.WriteLine($"文件锁已释放,成功处理: {filePath}"); + return true; + } + catch + { + // 继续重试 + } + } + } + Console.WriteLine($"文件仍然被锁定,重试{config.FileLockRetryCount}次后放弃: {filePath}"); + return false; + + case "log": + default: + // 只记录不处理 + Console.WriteLine($"文件被锁定,已记录: {filePath}"); + return false; + } + } + + /// + /// 检查文件系统监控器是否健康 + /// + /// 要检查的监控器 + /// 最后一次事件的时间 + /// 健壮性配置 + /// 如果监控器健康则返回true,否则返回false + public static bool IsWatcherHealthy(FileSystemWatcher watcher, DateTime lastEventTime, RobustnessConfig config) + { + if (watcher == null || !watcher.EnableRaisingEvents) + return false; + + // 如果配置了超时时间,检查是否超时 + if (config.WatcherTimeoutSeconds > 0) + { + // 如果最后事件时间超过了超时时间,认为监控器可能已经失效 + TimeSpan timeSinceLastEvent = DateTime.UtcNow - lastEventTime; + if (timeSinceLastEvent.TotalSeconds > config.WatcherTimeoutSeconds) + { + // 执行一个简单的测试:尝试改变一些属性看是否抛出异常 + try + { + var currentFilter = watcher.Filter; + watcher.Filter = currentFilter; + return true; // 如果没有异常,认为监控器仍然正常 + } + catch + { + return false; // 抛出异常,认为监控器已经失效 + } + } + } + + // 默认情况下认为监控器健康 + return true; + } + } +} \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj b/external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj new file mode 100644 index 0000000..193c217 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj @@ -0,0 +1,39 @@ + + + + Exe + net8.0 + enable + enable + Linux + ..\.. + + + + + + + + + + + + + + + + + + + + + + + + + + Always + + + + diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Program.cs b/external/JiShe.CollectBus.PluginFileWatcher/Program.cs new file mode 100644 index 0000000..92c47b8 --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/Program.cs @@ -0,0 +1,1110 @@ +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using System.Linq; +using Microsoft.Extensions.Configuration; +using System.Collections.Generic; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Serilog; +using Serilog.Events; +using Serilog.Extensions.Logging; +using ILogger = Microsoft.Extensions.Logging.ILogger; + +namespace JiShe.CollectBus.PluginFileWatcher +{ + // 删除配置类的定义 + + class Program + { + // 配置对象,从appsettings.json加载 + private static FileMonitorConfig _config = new FileMonitorConfig(); + + // 全局计数器和动态配置参数 + private static long _totalEventsProcessed = 0; + + // 监控器健康状态追踪 + private static DateTime _lastEventTime = DateTime.UtcNow; + private static int _watcherRestartAttempts = 0; + private static FileSystemWatcher _activeWatcher = null; + + // 事件存储和回放 + private static EventStorage _eventStorage = null; + private static EventReplaySession _currentReplaySession = null; + private static ILogger _logger = NullLogger.Instance; + + static async Task Main(string[] args) + { + Console.OutputEncoding = System.Text.Encoding.UTF8; + + // 检查是否以数据库工具模式运行 + if (args.Length > 0 && args[0].Equals("db", StringComparison.OrdinalIgnoreCase)) + { + // 运行数据库工具 + await RunDatabaseUtilityAsync(args.Skip(1).ToArray()); + return; + } + + // 加载配置文件 + LoadConfiguration(); + + // 配置Serilog + ConfigureSerilog(); + + // 初始化日志记录器 + _logger = new SerilogLoggerFactory().CreateLogger("FileMonitor"); + + Log.Information("高性能文件监控程序启动中..."); + + // 初始化事件存储 + if (_config.EventStorage.EnableEventStorage) + { + _eventStorage = new EventStorage(_config, _logger); + Log.Information("事件存储已启用,存储目录:{Directory}", _config.EventStorage.StorageDirectory); + } + + // 打印配置信息 + LogConfiguration(); + + // 获取监控路径:优先命令行参数,其次配置文件,最后使用当前目录 + string pathToMonitor = args.Length > 0 ? args[0] : + !string.IsNullOrEmpty(_config.General.DefaultMonitorPath) ? + _config.General.DefaultMonitorPath : Directory.GetCurrentDirectory(); + + // 命令行参数可以覆盖配置文件 + if (args.Length > 1 && args[1].Equals("--no-filter", StringComparison.OrdinalIgnoreCase)) + { + _config.General.EnableFileFiltering = false; + Log.Information("通过命令行参数禁用文件类型过滤,将监控所有文件"); + } + else if (_config.General.EnableFileFiltering) + { + Log.Information("已启用文件类型过滤,仅监控以下类型: {Extensions}", string.Join(", ", _config.FileFilters.AllowedExtensions)); + } + + if (!Directory.Exists(pathToMonitor)) + { + Log.Warning("错误:监控目录 '{Path}' 不存在。", pathToMonitor); + Log.Information("尝试创建该目录..."); + + try + { + Directory.CreateDirectory(pathToMonitor); + Log.Information("已成功创建目录: {Path}", pathToMonitor); + } + catch (Exception ex) + { + Log.Error(ex, "创建目录失败: {Message}", ex.Message); + Log.Information("将使用当前目录作为监控路径。"); + pathToMonitor = Directory.GetCurrentDirectory(); + } + } + + Log.Information("开始监控目录: {Path}", pathToMonitor); + Log.Information("按 'Q' 退出程序,按 'R' 重新加载配置,按 'H' 检查监控器健康状态。"); + if (_config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay) + { + Log.Information("按 'P' 开始/暂停回放,按 'S' 停止回放,按 'B' 查询事件。"); + } + + // 创建一个容量由配置文件指定的有界通道 + var channel = Channel.CreateBounded(new BoundedChannelOptions(_config.Performance.ChannelCapacity) + { + FullMode = BoundedChannelFullMode.DropOldest // 当通道满时丢弃旧事件 + }); + + using var cts = new CancellationTokenSource(); + + // 启动内存监控 + var memoryMonitorTask = StartMemoryMonitorAsync(cts.Token); + + // 启动文件监控器 + var monitorTask = StartFileMonitorAsync(pathToMonitor, channel.Writer, cts.Token); + + // 启动健康监控任务 + var healthMonitorTask = _config.Robustness.EnableAutoRecovery ? + StartWatcherHealthMonitorAsync(pathToMonitor, channel.Writer, cts.Token) : Task.CompletedTask; + + // 启动事件处理器 + var processorTask = ProcessEventsAsync(channel.Reader, cts.Token); + + // 等待用户按下键退出或重新加载配置 + bool needRestart = false; + while (true) + { + var key = Console.ReadKey(true).Key; + if (key == ConsoleKey.Q) + { + break; // 退出循环 + } + else if (key == ConsoleKey.R) + { + Log.Information("正在重新加载配置..."); + LoadConfiguration(); + // 重新配置Serilog + ConfigureSerilog(); + LogConfiguration(); + Log.Information("配置已重新加载,部分配置需要重启程序才能生效。"); + } + else if (key == ConsoleKey.H) + { + // 手动检查监控器健康状态 + bool isHealthy = _activeWatcher != null && + FileWatcherUtils.IsWatcherHealthy(_activeWatcher, _lastEventTime, _config.Robustness); + Log.Information("监控器健康状态: {Status}", (isHealthy ? "正常" : "异常")); + Log.Information("上次事件时间: {Time}", _lastEventTime); + Log.Information("重启次数: {Count}/{MaxCount}", _watcherRestartAttempts, _config.Robustness.MaxRestartAttempts); + Log.Information("已处理事件总数: {Count}", _totalEventsProcessed); + } + else if (key == ConsoleKey.P && _config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay) + { + await HandleEventReplayToggleAsync(); + } + else if (key == ConsoleKey.S && _config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay) + { + await StopReplayAsync(); + } + else if (key == ConsoleKey.B && _config.EventStorage.EnableEventStorage) + { + await QueryEventsAsync(); + } + + await Task.Delay(100); + } + + // 取消所有任务 + cts.Cancel(); + + // 停止回放(如果正在进行) + await StopReplayAsync(); + + // 释放事件存储 + _eventStorage?.Dispose(); + + try + { + var tasks = new List { monitorTask, processorTask, memoryMonitorTask }; + if (_config.Robustness.EnableAutoRecovery) + { + tasks.Add(healthMonitorTask); + } + + await Task.WhenAll(tasks); + } + catch (OperationCanceledException) + { + Log.Information("程序已正常退出。"); + } + + // 关闭Serilog + Log.CloseAndFlush(); + } + + // 配置Serilog + private static void ConfigureSerilog() + { + var configuration = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true) + .Build(); + + var loggerConfig = new LoggerConfiguration() + .ReadFrom.Configuration(configuration); + + // 根据配置设置最小日志级别 + LogEventLevel minimumLevel = LogEventLevel.Information; + if (!string.IsNullOrEmpty(_config.Logging?.LogLevel)) + { + switch (_config.Logging.LogLevel.ToLower()) + { + case "verbose": + minimumLevel = LogEventLevel.Verbose; + break; + case "debug": + minimumLevel = LogEventLevel.Debug; + break; + case "information": + minimumLevel = LogEventLevel.Information; + break; + case "warning": + minimumLevel = LogEventLevel.Warning; + break; + case "error": + minimumLevel = LogEventLevel.Error; + break; + case "fatal": + minimumLevel = LogEventLevel.Fatal; + break; + } + } + + // 如果配置文件中没有Serilog节,使用默认配置 + var hasConfig = configuration.GetSection("Serilog").Exists(); + if (!hasConfig) + { + loggerConfig + .MinimumLevel.Is(minimumLevel) + .WriteTo.Console( + outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}" + ) + .WriteTo.File( + Path.Combine(_config.Logging?.LogDirectory ?? "Logs", "filemonitor-.log"), + rollingInterval: RollingInterval.Day, + outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}", + retainedFileCountLimit: _config.Logging?.RetainedLogDays ?? 30 + ); + } + + Log.Logger = loggerConfig.CreateLogger(); + } + + // 查询事件 + private static async Task QueryEventsAsync() + { + if (_eventStorage == null) + { + Log.Warning("事件存储未启用"); + return; + } + + try + { + Log.Information("\n================ 事件查询 ================"); + Console.WriteLine("请输入查询参数,直接回车使用默认值"); + + // 收集查询参数 + var queryParams = new EventQueryParams(); + + Console.Write("开始时间 (yyyy-MM-dd HH:mm:ss),默认24小时前: "); + string startTimeStr = Console.ReadLine(); + if (!string.IsNullOrEmpty(startTimeStr)) + { + if (DateTime.TryParse(startTimeStr, out var startTime)) + { + queryParams.StartTime = startTime; + } + else + { + Log.Warning("无效的日期格式,使用默认值"); + queryParams.StartTime = DateTime.Now.AddDays(-1); + } + } + else + { + queryParams.StartTime = DateTime.Now.AddDays(-1); + } + + Console.Write("结束时间 (yyyy-MM-dd HH:mm:ss),默认当前时间: "); + string endTimeStr = Console.ReadLine(); + if (!string.IsNullOrEmpty(endTimeStr)) + { + if (DateTime.TryParse(endTimeStr, out var endTime)) + { + queryParams.EndTime = endTime; + } + else + { + Log.Warning("无效的日期格式,使用默认值"); + } + } + + Console.Write("事件类型 (Created, Modified, Deleted, Renamed, All),默认All: "); + string eventTypeStr = Console.ReadLine(); + if (!string.IsNullOrEmpty(eventTypeStr) && eventTypeStr.ToLower() != "all") + { + if (Enum.TryParse(eventTypeStr, true, out var eventType)) + { + queryParams.EventType = eventType; + } + else + { + Log.Warning("无效的事件类型,使用默认值All"); + } + } + + Console.Write("路径过滤 (包含此文本的路径),默认无: "); + string pathFilter = Console.ReadLine(); + if (!string.IsNullOrEmpty(pathFilter)) + { + queryParams.PathFilter = pathFilter; + } + + Console.Write("文件扩展名过滤 (.txt, .exe等),默认无: "); + string extensionFilter = Console.ReadLine(); + if (!string.IsNullOrEmpty(extensionFilter)) + { + queryParams.ExtensionFilter = extensionFilter; + } + + Console.Write("每页显示数量,默认20: "); + string pageSizeStr = Console.ReadLine(); + if (!string.IsNullOrEmpty(pageSizeStr) && int.TryParse(pageSizeStr, out var pageSize)) + { + queryParams.PageSize = pageSize; + } + else + { + queryParams.PageSize = 20; + } + + // 执行查询 + var result = await _eventStorage.QueryEventsAsync(queryParams); + + Log.Information("查询结果 (共{Count}条记录):", result.TotalCount); + Log.Information("时间范围: {StartTime} 至 {EndTime}", result.StartTime, result.EndTime); + Console.WriteLine("---------------------------------------------"); + + if (result.Events.Count == 0) + { + Log.Warning("未找到符合条件的事件"); + } + else + { + foreach (var evt in result.Events) + { + string eventType = evt.EventType.ToString(); + string timeStr = evt.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff"); + + Console.WriteLine($"[{timeStr}] [{eventType}] {evt.FileName}"); + Console.WriteLine($" 路径: {evt.Directory}"); + + if (evt.EventType == FileEventType.Renamed && !string.IsNullOrEmpty(evt.OldFileName)) + { + Console.WriteLine($" 原名称: {evt.OldFileName}"); + } + + Console.WriteLine(); + } + + if (result.HasMore) + { + Log.Warning("... 还有更多记录未显示 ..."); + } + } + + // 询问是否开始回放 + if (result.Events.Count > 0 && _config.EventStorage.EnableEventReplay) + { + Console.Write("\n是否要回放这些事件? (Y/N): "); + string replayAnswer = Console.ReadLine(); + if (!string.IsNullOrEmpty(replayAnswer) && replayAnswer.ToUpper().StartsWith("Y")) + { + await StartReplayAsync(queryParams); + } + } + } + catch (Exception ex) + { + Log.Error(ex, "查询事件时出错: {Message}", ex.Message); + } + + Console.WriteLine("=============================================\n"); + } + + // 处理回放切换(开始/暂停) + private static async Task HandleEventReplayToggleAsync() + { + if (_eventStorage == null) return; + + try + { + if (_currentReplaySession == null) + { + // 如果没有活动的回放会话,创建一个简单的查询参数并开始回放 + var queryParams = new EventQueryParams + { + StartTime = DateTime.Now.AddHours(-1), + PageSize = 1000 + }; + + await StartReplayAsync(queryParams); + } + else if (_currentReplaySession.IsPaused) + { + // 恢复回放 + await _currentReplaySession.ResumeAsync(); + Log.Information("已恢复事件回放"); + } + else + { + // 暂停回放 + await _currentReplaySession.PauseAsync(); + Log.Information("已暂停事件回放"); + } + } + catch (Exception ex) + { + Log.Error(ex, "处理回放时出错: {Message}", ex.Message); + } + } + + // 开始回放 + private static async Task StartReplayAsync(EventQueryParams queryParams) + { + if (_eventStorage == null || !_config.EventStorage.EnableEventReplay) return; + + try + { + // 停止现有回放 + await StopReplayAsync(); + + // 创建回放处理器 + Func replayHandler = async (e) => + { + // 在控制台上显示回放的事件 + string eventType = e.EventType.ToString(); + string timeStr = e.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff"); + + Log.Information("[回放] [{Time}] [{Type}] {FileName}", timeStr, eventType, e.FileName); + + if (_config.Logging.LogFileEventDetails) + { + Log.Debug(" 路径: {Path}", e.Directory); + + if (e.EventType == FileEventType.Renamed && !string.IsNullOrEmpty(e.OldFileName)) + { + Log.Debug(" 原名称: {OldName}", e.OldFileName); + } + } + + // 这里可以添加实际的文件操作,比如创建、删除文件等 + // 但在此示例中,我们只是显示事件而不执行实际操作 + + await Task.CompletedTask; + }; + + // 开始回放会话 + _currentReplaySession = await _eventStorage.StartReplayAsync( + queryParams, + replayHandler, + CancellationToken.None); + + // 如果会话创建成功,开始回放 + if (_currentReplaySession != null) + { + await _currentReplaySession.StartAsync(); + Log.Information("开始回放事件,共 {Count} 个事件", _currentReplaySession.TotalEvents); + Log.Information("回放速度: {Speed}", (_config.EventStorage.ReplaySpeedFactor > 1 ? + $"{_config.EventStorage.ReplaySpeedFactor}x (加速)" : + $"{_config.EventStorage.ReplaySpeedFactor}x (减速)")); + } + else + { + Log.Warning("未找到符合条件的事件,回放未开始"); + } + } + catch (Exception ex) + { + Log.Error(ex, "开始回放时出错: {Message}", ex.Message); + } + } + + // 停止回放 + private static async Task StopReplayAsync() + { + if (_currentReplaySession != null) + { + try + { + await _currentReplaySession.StopAsync(); + Log.Information("已停止事件回放"); + + // 输出回放统计 + Log.Information("回放统计: 共处理 {Processed}/{Total} 个事件", + _currentReplaySession.ProcessedEvents, + _currentReplaySession.TotalEvents); + + TimeSpan duration = (_currentReplaySession.EndTime ?? DateTime.Now) - _currentReplaySession.StartTime; + Log.Information("回放持续时间: {Duration:F1} 秒", duration.TotalSeconds); + } + catch (Exception ex) + { + Log.Error(ex, "停止回放时出错: {Message}", ex.Message); + } + finally + { + _currentReplaySession.Dispose(); + _currentReplaySession = null; + } + } + } + + // 加载配置文件 + private static void LoadConfiguration() + { + try + { + // 检查配置文件是否存在 + string configPath = Path.Combine(Directory.GetCurrentDirectory(), "appsettings.json"); + if (!File.Exists(configPath)) + { + Log.Warning("配置文件不存在,使用默认配置。"); + + // 使用默认配置 + _config = new FileMonitorConfig + { + General = new GeneralConfig + { + EnableFileFiltering = true, + MemoryMonitorIntervalMinutes = 1, + DefaultMonitorPath = Path.Combine(Directory.GetCurrentDirectory(), "MonitorFiles") + }, + FileFilters = new FileFiltersConfig + { + AllowedExtensions = new[] { ".dll" }, + ExcludedDirectories = new[] { "bin", "obj", "node_modules" }, + IncludeSubdirectories = true + }, + Performance = new PerformanceConfig + { + MemoryCleanupThreshold = 5000, + ChannelCapacity = 1000, + EventDebounceTimeSeconds = 3, + MaxDictionarySize = 10000, + CleanupIntervalSeconds = 5, + ProcessingDelayMs = 5 + }, + Robustness = new RobustnessConfig + { + EnableAutoRecovery = true, + WatcherHealthCheckIntervalSeconds = 30, + WatcherTimeoutSeconds = 60, + MaxRestartAttempts = 3, + RestartDelaySeconds = 5, + EnableFileLockDetection = true, + LockedFileStrategy = "Retry", + FileLockRetryCount = 3, + FileLockRetryDelayMs = 500 + }, + NotifyFilters = new List { "LastWrite", "FileName", "DirectoryName", "CreationTime" }, + EventStorage = new EventStorageConfig + { + EnableEventStorage = false, + StorageDirectory = Path.Combine(Directory.GetCurrentDirectory(), "EventStorage"), + EnableEventReplay = false, + ReplaySpeedFactor = 1 + }, + Logging = new LoggingConfig + { + LogLevel = "Information", + LogFileEventDetails = false, + RetainedLogDays = 30, + LogDirectory = "Logs" + } + }; + + return; + } + + var builder = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true); + + var configuration = builder.Build(); + + // 绑定配置到对象 + var config = new FileMonitorConfig(); + configuration.GetSection("FileMonitorConfig").Bind(config); + + // 更新全局配置 + _config = config; + } + catch (Exception ex) + { + Log.Error(ex, "加载配置文件失败: {Message}", ex.Message); + Log.Warning("将使用默认配置。"); + + // 使用默认配置 + _config = new FileMonitorConfig + { + General = new GeneralConfig + { + EnableFileFiltering = true, + MemoryMonitorIntervalMinutes = 1, + DefaultMonitorPath = Path.Combine(Directory.GetCurrentDirectory(), "MonitorFiles") + }, + FileFilters = new FileFiltersConfig + { + AllowedExtensions = new[] { ".dll" }, + ExcludedDirectories = new[] { "bin", "obj", "node_modules" }, + IncludeSubdirectories = true + }, + Performance = new PerformanceConfig + { + MemoryCleanupThreshold = 5000, + ChannelCapacity = 1000, + EventDebounceTimeSeconds = 3, + MaxDictionarySize = 10000, + CleanupIntervalSeconds = 5, + ProcessingDelayMs = 5 + }, + Robustness = new RobustnessConfig + { + EnableAutoRecovery = true, + WatcherHealthCheckIntervalSeconds = 30, + WatcherTimeoutSeconds = 60, + MaxRestartAttempts = 3, + RestartDelaySeconds = 5, + EnableFileLockDetection = true, + LockedFileStrategy = "Retry", + FileLockRetryCount = 3, + FileLockRetryDelayMs = 500 + }, + NotifyFilters = new List { "LastWrite", "FileName", "DirectoryName", "CreationTime" }, + EventStorage = new EventStorageConfig + { + EnableEventStorage = false, + StorageDirectory = Path.Combine(Directory.GetCurrentDirectory(), "EventStorage"), + EnableEventReplay = false, + ReplaySpeedFactor = 1 + }, + Logging = new LoggingConfig + { + LogLevel = "Information", + LogFileEventDetails = false, + RetainedLogDays = 30, + LogDirectory = "Logs" + } + }; + } + } + + // 打印当前配置信息 + private static void LogConfiguration() + { + Log.Information("当前配置信息:"); + Log.Information(" 默认监控路径: {Path}", _config.General.DefaultMonitorPath); + Log.Information(" 文件过滤: {Status}", (_config.General.EnableFileFiltering ? "启用" : "禁用")); + if (_config.General.EnableFileFiltering) + { + Log.Information(" 监控文件类型: {Types}", string.Join(", ", _config.FileFilters.AllowedExtensions)); + Log.Information(" 排除目录: {Dirs}", string.Join(", ", _config.FileFilters.ExcludedDirectories)); + } + Log.Information(" 内存监控间隔: {Minutes}分钟", _config.General.MemoryMonitorIntervalMinutes); + Log.Information(" 内存清理阈值: 每{Threshold}个事件", _config.Performance.MemoryCleanupThreshold); + Log.Information(" 通道容量: {Capacity}", _config.Performance.ChannelCapacity); + Log.Information(" 事件去抖时间: {Seconds}秒", _config.Performance.EventDebounceTimeSeconds); + Log.Information(" 自动恢复: {Status}", (_config.Robustness.EnableAutoRecovery ? "启用" : "禁用")); + Log.Information(" 文件锁检测: {Status}", (_config.Robustness.EnableFileLockDetection ? "启用" : "禁用")); + if (_config.Robustness.EnableFileLockDetection) + { + Log.Information(" 锁定文件策略: {Strategy}", _config.Robustness.LockedFileStrategy); + } + Log.Information(" 日志级别: {Level}", _config.Logging?.LogLevel ?? "Information"); + } + + // 检查文件是否应该被处理的方法 + private static bool ShouldProcessFile(string filePath) + { + if (!_config.General.EnableFileFiltering) + return true; + + if (string.IsNullOrEmpty(filePath)) + return false; + + // 检查是否在排除目录中 + foreach (var excludedDir in _config.FileFilters.ExcludedDirectories) + { + if (filePath.Contains($"{Path.DirectorySeparatorChar}{excludedDir}{Path.DirectorySeparatorChar}", StringComparison.OrdinalIgnoreCase) || + filePath.EndsWith($"{Path.DirectorySeparatorChar}{excludedDir}", StringComparison.OrdinalIgnoreCase)) + { + return false; + } + } + + string extension = Path.GetExtension(filePath); + return _config.FileFilters.AllowedExtensions.Contains(extension, StringComparer.OrdinalIgnoreCase); + } + + // 添加内存监控方法 + private static async Task StartMemoryMonitorAsync(CancellationToken cancellationToken) + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + // 等待指定时间 + await Task.Delay(TimeSpan.FromMinutes(_config.General.MemoryMonitorIntervalMinutes), cancellationToken); + + // 获取当前内存使用情况 + long currentMemory = GC.GetTotalMemory(false) / (1024 * 1024); // MB + + Log.Information("内存使用: {Memory} MB, 已处理事件: {EventCount}", currentMemory, _totalEventsProcessed); + + // 强制执行垃圾回收 + GC.Collect(); + GC.WaitForPendingFinalizers(); + + // 显示垃圾回收后的内存 + long afterGCMemory = GC.GetTotalMemory(true) / (1024 * 1024); // MB + Log.Debug("内存清理后: {Memory} MB (释放: {Released} MB)", afterGCMemory, currentMemory - afterGCMemory); + } + } + catch (OperationCanceledException) + { + // 预期的取消异常 + } + } + + // 创建文件监控器 + private static FileSystemWatcher CreateFileWatcher(string path, ChannelWriter writer, CancellationToken cancellationToken) + { + // 创建一个新的文件系统监控器 + var watcher = new FileSystemWatcher(path); + + // 从配置设置NotifyFilters + NotifyFilters notifyFilters = NotifyFilters.LastWrite; // 默认值 + foreach (var filterName in _config.NotifyFilters) + { + if (Enum.TryParse(filterName, out var filter)) + { + notifyFilters |= filter; + } + } + + watcher.NotifyFilter = notifyFilters; + watcher.IncludeSubdirectories = _config.FileFilters.IncludeSubdirectories; + watcher.EnableRaisingEvents = true; + + // 设置文件过滤器,如果启用了过滤 + if (_config.General.EnableFileFiltering && _config.FileFilters.AllowedExtensions.Length > 0) + { + // 在FileSystemWatcher级别设置过滤器,这样可以减少系统生成的事件数量 + // 只能设置一个扩展名作为Filter,所以我们选择第一个 + string firstExtension = _config.FileFilters.AllowedExtensions[0]; + watcher.Filter = $"*{firstExtension}"; + Log.Information("已设置文件系统监控过滤器: *{Filter}", firstExtension); + + if (_config.FileFilters.AllowedExtensions.Length > 1) + { + Log.Warning("注意: FileSystemWatcher只支持单一过滤器,其他文件类型将在事件处理时过滤。"); + } + } + + // 注册事件处理程序 + watcher.Created += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken); + watcher.Changed += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken); + watcher.Deleted += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken); + watcher.Renamed += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken); + watcher.Error += (sender, e) => + { + Log.Error(e.GetException(), "文件监控错误: {Message}", e.GetException().Message); + }; + + return watcher; + } + + // 文件事件处理 + private static void HandleFileEvent(object sender, FileSystemEventArgs e, ChannelWriter writer, CancellationToken cancellationToken) + { + // 更新最后事件时间 + _lastEventTime = DateTime.UtcNow; + + // 如果启用了事件存储,记录事件 + _eventStorage?.RecordEvent(e); + + try + { + // 如果是退出信号,忽略后续处理 + if (cancellationToken.IsCancellationRequested) + { + return; + } + + // 只处理指定类型的文件,如果启用了文件过滤 + if (!ShouldProcessFile(e.FullPath)) + { + return; + } + + // 尝试将事件写入通道,如果满了就丢弃 + // 不等待,以免阻塞文件系统事件 + if (!writer.TryWrite(e)) + { + Log.Warning("警告: 事件处理队列已满,部分事件被丢弃"); + } + } + catch (Exception ex) + { + // 捕获任何异常,防止崩溃 + Log.Error(ex, "处理文件事件时出错: {Message}", ex.Message); + } + } + + private static async Task StartFileMonitorAsync(string path, ChannelWriter writer, CancellationToken cancellationToken) + { + // 使用TaskCompletionSource来控制任务的完成 + var tcs = new TaskCompletionSource(); + + try + { + // 创建并启动监控器 + _activeWatcher = CreateFileWatcher(path, writer, cancellationToken); + + // 注册取消回调 + cancellationToken.Register(() => + { + try + { + if (_activeWatcher != null) + { + // 确保所有资源正确清理 + _activeWatcher.EnableRaisingEvents = false; + _activeWatcher.Dispose(); + } + } + catch (Exception ex) + { + Log.Error(ex, "关闭文件监控器时出错: {Message}", ex.Message); + } + finally + { + tcs.TrySetResult(true); + } + }); + + // 等待任务被取消或完成 + await tcs.Task; + } + catch (Exception ex) + { + Log.Error(ex, "启动文件监控器时出错: {Message}", ex.Message); + tcs.TrySetException(ex); + throw; + } + } + + private static async Task ProcessEventsAsync(ChannelReader reader, CancellationToken cancellationToken) + { + try + { + // 使用配置的字典大小 + var recentlyProcessed = new ConcurrentDictionary(); + + // 清理计时器,频率由配置决定 + using var cleanupTimer = new Timer(_ => + { + try + { + var cutoff = DateTime.UtcNow.AddSeconds(-_config.Performance.EventDebounceTimeSeconds); + int removedCount = 0; + + // 限制字典大小 + if (recentlyProcessed.Count > _config.Performance.MaxDictionarySize) + { + // 找出最旧的条目删除 + var oldestItems = recentlyProcessed + .OrderBy(kv => kv.Value) + .Take(recentlyProcessed.Count - _config.Performance.MaxDictionarySize / 2); + + foreach (var item in oldestItems) + { + DateTime dummy; + if (recentlyProcessed.TryRemove(item.Key, out dummy)) + { + removedCount++; + } + } + } + + // 常规清理 + foreach (var key in recentlyProcessed.Keys) + { + if (recentlyProcessed.TryGetValue(key, out var time) && time < cutoff) + { + DateTime dummy; + if (recentlyProcessed.TryRemove(key, out dummy)) + { + removedCount++; + } + } + } + + if (removedCount > 0) + { + Log.Debug("已清理 {Count} 个过期事件记录", removedCount); + } + } + catch (Exception ex) + { + Log.Error(ex, "清理定时器错误: {Message}", ex.Message); + } + }, null, TimeSpan.FromSeconds(_config.Performance.CleanupIntervalSeconds), + TimeSpan.FromSeconds(_config.Performance.CleanupIntervalSeconds)); + + // 从通道读取事件并处理 + await foreach (var e in reader.ReadAllAsync(cancellationToken)) + { + // 再次确认文件扩展名,双重检查 + if (!ShouldProcessFile(e.FullPath)) + { + continue; + } + + // 增加计数器 + long count = Interlocked.Increment(ref _totalEventsProcessed); + + // 周期性清理内存 + if (count % _config.Performance.MemoryCleanupThreshold == 0) + { + // 在后台线程执行垃圾回收,但直接等待完成避免警告 + await Task.Run(() => + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + }); + } + + // 只使用文件路径作为键,忽略事件类型,这样同一文件的创建和修改事件会被视为同一事件 + var key = e.FullPath; + + // 对于创建事件,如果之前有相同文件的其他事件,则跳过 + if (e.ChangeType == WatcherChangeTypes.Changed && + recentlyProcessed.TryGetValue(key, out var _)) + { + // 如果是修改事件且文件最近被处理过,则跳过 + continue; + } + + // 更新或添加文件处理时间,使用原子操作 + recentlyProcessed[key] = DateTime.UtcNow; + + string eventType = e.ChangeType switch + { + WatcherChangeTypes.Created => "创建", + WatcherChangeTypes.Deleted => "删除", + WatcherChangeTypes.Changed => "修改", + WatcherChangeTypes.Renamed => "重命名", + _ => "未知" + }; + + string fileName = e.Name ?? Path.GetFileName(e.FullPath); + string directoryName = Path.GetDirectoryName(e.FullPath) ?? string.Empty; + + Log.Information("{EventType}: {FileName}", eventType, fileName); + + if (_config.Logging.LogFileEventDetails) + { + Log.Debug(" 路径: {Path}", directoryName); + + var extension = Path.GetExtension(e.FullPath); + Log.Debug(" 类型: {Extension}", extension); + + if (e is RenamedEventArgs renamedEvent) + { + Log.Debug(" 原名称: {OldName}", Path.GetFileName(renamedEvent.OldFullPath)); + } + } + + // 延迟可配置 + await Task.Delay(_config.Performance.ProcessingDelayMs, cancellationToken); + } + } + catch (OperationCanceledException) + { + // 预期的取消异常,正常退出 + } + catch (Exception ex) + { + Log.Error(ex, "处理事件时发生错误: {Message}", ex.Message); + } + finally + { + // 确保资源释放 + GC.Collect(); + GC.WaitForPendingFinalizers(); + } + } + + // 健康监控任务,监视文件监控器的状态并在需要时重启它 + private static async Task StartWatcherHealthMonitorAsync( + string path, ChannelWriter writer, CancellationToken cancellationToken) + { + Log.Information("已启动监控器健康检查,间隔: {Seconds}秒", _config.Robustness.WatcherHealthCheckIntervalSeconds); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + // 等待指定的健康检查间隔 + await Task.Delay(TimeSpan.FromSeconds(_config.Robustness.WatcherHealthCheckIntervalSeconds), + cancellationToken); + + // 如果监控器不健康且重启次数未超过上限,尝试重启 + if (_activeWatcher != null && + !FileWatcherUtils.IsWatcherHealthy(_activeWatcher, _lastEventTime, _config.Robustness) && + _watcherRestartAttempts < _config.Robustness.MaxRestartAttempts) + { + // 停止当前的监控器 + Log.Warning("检测到监控器异常,准备重启..."); + + try + { + _activeWatcher.EnableRaisingEvents = false; + _activeWatcher.Dispose(); + } + catch (Exception ex) + { + Log.Error(ex, "停止异常监控器时出错: {Message}", ex.Message); + } + + _watcherRestartAttempts++; + Log.Warning("正在重启监控器,尝试次数: {Current}/{Max}", + _watcherRestartAttempts, _config.Robustness.MaxRestartAttempts); + + // 等待指定的重启延迟时间 + await Task.Delay(TimeSpan.FromSeconds(_config.Robustness.RestartDelaySeconds), + cancellationToken); + + // 创建新的监控器 + try + { + _activeWatcher = CreateFileWatcher(path, writer, cancellationToken); + _lastEventTime = DateTime.UtcNow; // 重置最后事件时间 + Log.Information("监控器已成功重启"); + } + catch (Exception ex) + { + Log.Error(ex, "重启监控器失败: {Message}", ex.Message); + } + } + else if (_watcherRestartAttempts >= _config.Robustness.MaxRestartAttempts) + { + Log.Error("警告: 已达到最大重启次数({Max}),不再尝试重启", _config.Robustness.MaxRestartAttempts); + Log.Warning("请检查文件系统或手动重启程序"); + break; + } + } + } + catch (OperationCanceledException) + { + // 预期的取消异常,正常退出 + } + catch (Exception ex) + { + Log.Error(ex, "健康监控任务异常: {Message}", ex.Message); + } + } + + /// + /// 运行数据库工具 + /// + private static async Task RunDatabaseUtilityAsync(string[] args) + { + Console.WriteLine("正在启动SQLite数据库管理工具..."); + + try + { + var dbUtility = new DbUtility(); + await dbUtility.ExecuteAsync(args); + } + catch (Exception ex) + { + Console.WriteLine($"运行数据库工具时发生错误: {ex.Message}"); + } + } + } +} diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Properties/launchSettings.json b/external/JiShe.CollectBus.PluginFileWatcher/Properties/launchSettings.json new file mode 100644 index 0000000..9293f4d --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/Properties/launchSettings.json @@ -0,0 +1,10 @@ +{ + "profiles": { + "JiShe.CollectBus.PluginFileWatcher": { + "commandName": "Project" + }, + "Container (Dockerfile)": { + "commandName": "Docker" + } + } +} \ No newline at end of file diff --git a/external/JiShe.CollectBus.PluginFileWatcher/appsettings.json b/external/JiShe.CollectBus.PluginFileWatcher/appsettings.json new file mode 100644 index 0000000..f0eedfa --- /dev/null +++ b/external/JiShe.CollectBus.PluginFileWatcher/appsettings.json @@ -0,0 +1,88 @@ +{ + "Serilog": { + "Using": [ "Serilog.Sinks.Console", "Serilog.Sinks.File" ], + "MinimumLevel": { + "Default": "Information", + "Override": { + "Microsoft": "Warning", + "System": "Warning" + } + }, + "WriteTo": [ + { + "Name": "Console", + "Args": { + "outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}" + } + }, + { + "Name": "File", + "Args": { + "path": "Logs/filemonitor-.log", + "rollingInterval": "Day", + "outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}", + "retainedFileCountLimit": 31 + } + } + ], + "Enrich": [ "FromLogContext" ] + }, + "FileMonitorConfig": { + "General": { + "EnableFileFiltering": true, + "MemoryMonitorIntervalMinutes": 1, + "DefaultMonitorPath": "D:\\MonitorFiles" + }, + "FileFilters": { + "AllowedExtensions": [ ".dll" ], + "ExcludedDirectories": [ "bin", "obj", "node_modules" ], + "IncludeSubdirectories": true + }, + "Performance": { + "MemoryCleanupThreshold": 5000, + "ChannelCapacity": 1000, + "EventDebounceTimeSeconds": 3, + "MaxDictionarySize": 10000, + "CleanupIntervalSeconds": 5, + "ProcessingDelayMs": 5 + }, + "Robustness": { + "EnableAutoRecovery": true, + "WatcherHealthCheckIntervalSeconds": 30, + "WatcherTimeoutSeconds": 60, + "MaxRestartAttempts": 3, + "RestartDelaySeconds": 5, + "EnableFileLockDetection": true, + "LockedFileStrategy": "Retry", + "FileLockRetryCount": 3, + "FileLockRetryDelayMs": 500 + }, + "EventStorage": { + "EnableEventStorage": true, + "StorageType": "SQLite", + "StorageDirectory": "D:/EventLogs", + "DatabasePath": "D:/EventLogs/events.db", + "ConnectionString": "Data Source=D:/EventLogs/events.db;Foreign Keys=True", + "CommandTimeout": 30, + "LogFileNameFormat": "FileEvents_{0:yyyy-MM-dd}.json", + "StorageIntervalSeconds": 60, + "BatchSize": 100, + "MaxEventRecords": 100000, + "DataRetentionDays": 30, + "MaxLogFiles": 30, + "CompressLogFiles": true, + "EnableEventReplay": true, + "ReplayIntervalMs": 100, + "ReplaySpeedFactor": 1.0 + }, + "NotifyFilters": [ + "LastWrite", + "FileName", + "DirectoryName", + "CreationTime" + ], + "Logging": { + "LogLevel": "Information" + } + } +} diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 5313833..51c3304 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Text; +using Volo.Abp.Timing; namespace JiShe.CollectBus.Kafka.Consumer { diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index e49d1be..da59a22 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Common.Enums; +using FreeRedis; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Protocol.Contracts.Interfaces; @@ -12,6 +13,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.FreeRedis; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { @@ -20,6 +22,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; + private readonly IFreeRedisProvider _redisProvider; //头部字节长度 public const int hearderLen = 6; @@ -34,17 +37,17 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts /// The service provider. protected BaseProtocolPlugin(IServiceProvider serviceProvider) { - _logger = serviceProvider.GetRequiredService>(); _protocolInfoRepository = serviceProvider.GetRequiredService>(); _producerService = serviceProvider.GetRequiredService(); + _redisProvider = serviceProvider.GetRequiredService(); } public abstract ProtocolInfo Info { get; } public virtual async Task GetAsync() => await Task.FromResult(Info); - public virtual async Task AddAsync() + public virtual async Task LoadAsync() { if (Info == null) { @@ -53,7 +56,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name); await _protocolInfoRepository.InsertAsync(Info); - //await _protocolInfoCache.Get() + await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name); + await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info); } public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761; @@ -1168,6 +1172,5 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts } #endregion - } } diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/CollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/CollectBusProtocolModule.cs new file mode 100644 index 0000000..4f181db --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/CollectBusProtocolModule.cs @@ -0,0 +1,18 @@ +using JiShe.CollectBus.FreeRedis.Options; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Modularity; + +namespace JiShe.CollectBus.Protocol.Contracts +{ + public class CollectBusProtocolModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + + } + } +} + + + diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs index 2f48cd2..c35dbee 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs @@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces { Task GetAsync(); - Task AddAsync(); + Task LoadAsync(); Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761; diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolService.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolService.cs new file mode 100644 index 0000000..c8ea899 --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolService.cs @@ -0,0 +1,22 @@ +using JiShe.CollectBus.IotSystems.Protocols; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp; + +namespace JiShe.CollectBus.Protocol.Contracts.Interfaces +{ + public interface IProtocolService + { + /// + /// 通过仪器设备型号获取协议信息 + /// + /// + /// + /// + /// + Task FirstOrDefaultByDeviceAsync(string deviceCode, bool isSpecial = false); + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index 81469e8..495dbf1 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -19,6 +19,7 @@ + diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs new file mode 100644 index 0000000..3707e6b --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs @@ -0,0 +1,52 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.FreeRedis; +using JiShe.CollectBus.IotSystems.Protocols; +using Volo.Abp.DependencyInjection; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using System.Text.RegularExpressions; +using Volo.Abp; + +namespace JiShe.CollectBus.Protocol.Contracts.Services +{ + public class ProtocolService : IProtocolService, ISingletonDependency + { + private readonly IFreeRedisProvider _freeRedisProvider; + + public ProtocolService(IFreeRedisProvider freeRedisProvider) + { + _freeRedisProvider = freeRedisProvider; + } + + /// + /// 通过仪器设备型号获取协议信息 + /// + /// + /// + /// + /// + public async Task FirstOrDefaultByDeviceAsync(string deviceCode, bool isSpecial = false) + { + var protocols = await _freeRedisProvider.Instance.HGetAllAsync(RedisConst.ProtocolKey); + var keyValuePair = protocols.FirstOrDefault(a => ContainsExactPartRegex(deviceCode, a.Value.RegularExpression)); + if (!keyValuePair.Key.IsNullOrWhiteSpace() || keyValuePair.Value != null) return keyValuePair.Value; + if (isSpecial) throw new UserFriendlyException("The device protocol plugin does not exist!", ExceptionCode.NotFound); + var hasStandardProtocolPlugin = protocols.TryGetValue("StandardProtocolPlugin", out var protocolInfo); + if (!hasStandardProtocolPlugin) throw new UserFriendlyException("Standard protocol plugin does not exist!", ExceptionCode.NotFound); + return protocolInfo; + } + + private static bool ContainsExactPartRegex(string searchPattern, string fullString) + { + // 构建正则表达式 - 匹配以逗号或开头为边界,以逗号或结尾为边界的部分 + var pattern = $"(^|,)\\s*{Regex.Escape(searchPattern)}\\s*(,|$)"; + return Regex.IsMatch(fullString, pattern, RegexOptions.IgnoreCase); + } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj b/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj index 8b89e97..bfc186f 100644 --- a/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj +++ b/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj @@ -17,7 +17,7 @@ - + diff --git a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs similarity index 72% rename from protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusProtocolModule.cs rename to protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs index 4abc95b..cdd5253 100644 --- a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs @@ -5,17 +5,17 @@ using Volo.Abp.Modularity; namespace JiShe.CollectBus.Protocol.Test { - public class JiSheCollectBusProtocolModule : AbpModule + public class JiSheCollectBusTestProtocolModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddKeyedSingleton(nameof(TestProtocolPlugin)); } - public override void OnApplicationInitialization(ApplicationInitializationContext context) + public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { var protocol = context.ServiceProvider.GetRequiredKeyedService(nameof(TestProtocolPlugin)); - protocol.AddAsync(); + await protocol.LoadAsync(); } } } diff --git a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs index 2573ab7..dbdfa82 100644 --- a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs @@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using NUglify.JavaScript.Syntax; namespace JiShe.CollectBus.Protocol.Test { @@ -19,137 +20,9 @@ namespace JiShe.CollectBus.Protocol.Test public sealed override ProtocolInfo Info => new(nameof(TestProtocolPlugin), "Test", "TCP", "Test协议", "DTS1980-Test"); - public override async Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) + public override Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) { throw new NotImplementedException(); } - - #region 上行命令 - - //68 - //32 00 - //32 00 - //68 - //C9 1100'1001. 控制域C。 - // D7=1, (终端发送)上行方向。 - // D6=1, 此帧来自启动站。 - // D5=0, (上行方向)要求访问位。表示终端无事件数据等待访问。 - // D4=0, 保留 - // D3~D0=9, 功能码。链路测试 - - //20 32 行政区划码 - //90 26 终端地址 - //00 主站地址和组地址标志。终端为单地址。 //3220 09 87 2 - // 终端启动的发送帧的 MSA 应为 0, 其主站响应帧的 MSA 也应为 0. - //02 应用层功能码。AFN=2, 链路接口检测 - //70 0111'0000. 帧序列域。无时间标签、单帧、需要确认。 - //00 00 信息点。DA1和DA2全为“0”时,表示终端信息点。 - //01 00 信息类。F1, 登录。 - //44 帧尾,包含用户区数据校验和 - //16 帧结束标志 - - /// - /// 解析上行命令 - /// - /// - /// - public CommandReulst? AnalysisCmd(string cmd) - { - CommandReulst? commandReulst = null; - var hexStringList = cmd.StringToPairs(); - - if (hexStringList.Count < hearderLen) - { - return commandReulst; - } - //验证起始字符 - if (!hexStringList[0].IsStartStr() || !hexStringList[5].IsStartStr()) - { - return commandReulst; - } - - var lenHexStr = $"{hexStringList[2]}{hexStringList[1]}"; - var lenBin = lenHexStr.HexToBin(); - var len = lenBin.Remove(lenBin.Length - 2).BinToDec(); - //验证长度 - if (hexStringList.Count - 2 != hearderLen + len) - return commandReulst; - - var userDataIndex = hearderLen; - var c = hexStringList[userDataIndex];//控制域 1字节 - userDataIndex += 1; - - var aHexList = hexStringList.Skip(userDataIndex).Take(5).ToList();//地址域 5字节 - var a = AnalysisA(aHexList); - var a3Bin = aHexList[4].HexToBin().PadLeft(8, '0'); - var mSA = a3Bin.Substring(0, 7).BinToDec(); - userDataIndex += 5; - - var aFN = (AFN)hexStringList[userDataIndex].HexToDec();//1字节 - userDataIndex += 1; - - var seq = hexStringList[userDataIndex].HexToBin().PadLeft(8, '0'); - var tpV = (TpV)Convert.ToInt32(seq.Substring(0, 1)); - var fIRFIN = (FIRFIN)Convert.ToInt32(seq.Substring(1, 2)); - var cON = (CON)Convert.ToInt32(seq.Substring(3, 1)); - var prseqBin = seq.Substring(4, 4); - userDataIndex += 1; - - // (DA2 - 1) * 8 + DA1 = pn - var da1Bin = hexStringList[userDataIndex].HexToBin(); - var da1 = da1Bin == "0" ? 0 : da1Bin.Length; - userDataIndex += 1; - var da2 = hexStringList[userDataIndex].HexToDec(); - var pn = da2 == 0 ? 0 : (da2 - 1) * 8 + da1; - userDataIndex += 1; - //(DT2*8)+DT1=fn - var dt1Bin = hexStringList[userDataIndex].HexToBin(); - var dt1 = dt1Bin != "0" ? dt1Bin.Length : 0; - userDataIndex += 1; - var dt2 = hexStringList[userDataIndex].HexToDec(); - var fn = dt2 * 8 + dt1; - userDataIndex += 1; - - //数据单元 - var datas = hexStringList.Skip(userDataIndex).Take(len + hearderLen - userDataIndex).ToList(); - - //EC - //Tp - commandReulst = new CommandReulst() - { - A = a, - MSA = mSA, - AFN = aFN, - Seq = new Seq() - { - TpV = tpV, - FIRFIN = fIRFIN, - CON = cON, - PRSEQ = prseqBin.BinToDec(), - }, - CmdLength = len, - Pn = pn, - Fn = fn, - HexDatas = datas - }; - - return commandReulst; - } - - /// - /// 解析地址 - /// - /// - /// - private string AnalysisA(List aHexList) - { - var a1 = aHexList[1] + aHexList[0]; - var a2 = aHexList[3] + aHexList[2]; - var a2Dec = a2.HexToDec(); - var a3 = aHexList[4]; - var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}"; - return a; - } - #endregion } } diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 5cda0c8..b0275a5 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -15,7 +15,7 @@ namespace JiShe.CollectBus.Protocol public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin)); - await standardProtocol.AddAsync(); + await standardProtocol.LoadAsync(); } } } diff --git a/services/JiShe.CollectBus.Application/CollectBusAppService.cs b/services/JiShe.CollectBus.Application/CollectBusAppService.cs index e155c65..7c66b95 100644 --- a/services/JiShe.CollectBus.Application/CollectBusAppService.cs +++ b/services/JiShe.CollectBus.Application/CollectBusAppService.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Threading.Tasks; using JiShe.CollectBus.FreeRedis; using Volo.Abp.Application.Services; +using Volo.Abp.Timing; namespace JiShe.CollectBus; @@ -20,7 +21,6 @@ public abstract class CollectBusAppService : ApplicationService public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService(); protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService()!; - protected CollectBusAppService() { LocalizationResource = typeof(CollectBusResource); diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 988c9df..c480b83 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -28,6 +28,7 @@ using Microsoft.Extensions.Options; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Common.Attributes; +using ZstdSharp.Unsafe; namespace JiShe.CollectBus; @@ -42,7 +43,8 @@ namespace JiShe.CollectBus; typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), typeof(CollectBusIoTDbModule), - typeof(CollectBusCassandraModule) + typeof(CollectBusCassandraModule), + typeof(CollectBusProtocolModule) )] public class CollectBusApplicationModule : AbpModule { @@ -63,13 +65,10 @@ public class CollectBusApplicationModule : AbpModule context.Services.OnRegistered(ctx => { var methods = ctx.ImplementationType.GetMethods(); - foreach (var method in methods) + var any = methods.Any(a=>a.GetCustomAttribute()!=null); + if (any) { - var attr = method.GetCustomAttribute(typeof(LogInterceptAttribute), true); - if (attr != null) - { - ctx.Interceptors.TryAdd(); - } + ctx.Interceptors.TryAdd(); } }); } @@ -84,22 +83,14 @@ public class CollectBusApplicationModule : AbpModule await context.AddBackgroundWorkerAsync(type); } - //默认初始化表计信息 - var dbContext = context.ServiceProvider.GetRequiredService(); - await dbContext.InitAmmeterCacheData(); - //await dbContext.InitWatermeterCacheData(); - - //初始化主题信息 - var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); - var kafkaOptions = context.ServiceProvider.GetRequiredService>(); - - var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); - topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - - foreach (var item in topics) + Task.Run(() => { - await kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor); - } + //默认初始化表计信息 + var dbContext = context.ServiceProvider.GetRequiredService(); + dbContext.InitAmmeterCacheData(); + //await dbContext.InitWatermeterCacheData(); + }).ConfigureAwait(false); + } } diff --git a/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs b/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs index d4ace87..6e2034a 100644 --- a/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs +++ b/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs @@ -18,7 +18,7 @@ namespace JiShe.CollectBus.EnergySystem { public class CacheAppService : CollectBusAppService, ICacheAppService { - public async Task SetHashByKey(string key) + public async Task SetHashByKey() { var data = await SqlProvider.Instance.Change(DbEnum.EnergyDB).Select().ToListAsync(); diff --git a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs index a3969aa..86ddb49 100644 --- a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs +++ b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs @@ -5,5 +5,6 @@ namespace JiShe.CollectBus.Interceptors [AttributeUsage(AttributeTargets.Method)] public class LogInterceptAttribute : Attribute { + } } diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs index 68d367c..7b0500a 100644 --- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -26,6 +26,7 @@ using System.Diagnostics; using System.Linq; using Cassandra; using JiShe.CollectBus.Interceptors; +using JiShe.CollectBus.IotSystems.Protocols; namespace JiShe.CollectBus.Samples; @@ -35,15 +36,17 @@ public class TestAppService : CollectBusAppService private readonly ILogger _logger; private readonly ICassandraRepository _messageReceivedCassandraRepository; private readonly ICassandraProvider _cassandraProvider; + private readonly IProtocolService _protocolService; public TestAppService( ILogger logger, ICassandraRepository messageReceivedCassandraRepository, - ICassandraProvider cassandraProvider) + ICassandraProvider cassandraProvider, IProtocolService protocolService) { _logger = logger; _messageReceivedCassandraRepository = messageReceivedCassandraRepository; _cassandraProvider = cassandraProvider; + _protocolService = protocolService; } public async Task AddMessageOfCassandra() { @@ -124,9 +127,15 @@ public class TestAppService : CollectBusAppService } [LogIntercept] - public async Task LogInterceptorTest(string str) + public virtual Task LogInterceptorTest(string str) { _logger.LogWarning(str); - return str; + return Task.FromResult(str) ; + } + + public virtual async Task GetProtocol(string deviceCode, bool isSpecial = false) + { + var protocol = await _protocolService.FirstOrDefaultByDeviceAsync(deviceCode, isSpecial); + return protocol; } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 8db1072..2ef854d 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -19,6 +19,7 @@ using JiShe.CollectBus.IoTDB.Interface; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; using System.Collections.Generic; +using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Kafka.Internal; namespace JiShe.CollectBus.Subscribers @@ -59,8 +60,8 @@ namespace JiShe.CollectBus.Subscribers _dbProvider = dbProvider; } + [LogIntercept] [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] - //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] public async Task LoginIssuedEvent(List issuedEventMessages) { bool isAck = false; @@ -97,7 +98,6 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] - //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] public async Task HeartbeatIssuedEvent(List issuedEventMessages) { bool isAck = false; @@ -132,7 +132,6 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] - //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] public async Task ReceivedEvent(MessageReceived receivedMessage) { var currentTime = Clock.Now; @@ -189,7 +188,6 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)] - //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) @@ -209,7 +207,6 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)] - //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(List receivedLoginMessages) { foreach (var receivedLoginMessage in receivedLoginMessages) diff --git a/services/JiShe.CollectBus.Domain/IotSystems/CommunicationLogs/PacketLog.cs b/services/JiShe.CollectBus.Domain/IotSystems/CommunicationLogs/PacketLog.cs new file mode 100644 index 0000000..bd57e22 --- /dev/null +++ b/services/JiShe.CollectBus.Domain/IotSystems/CommunicationLogs/PacketLog.cs @@ -0,0 +1,43 @@ +using System; + +namespace JiShe.CollectBus.IotSystems.CommunicationLogs +{ + public class PacketLog : ICassandraEntity + { + /// + /// 下行报文 + /// + public string IssuedMessage { get; set; } = string.Empty; + + /// + /// 上行报文 + /// + public string ReportMessage { get; set; } = string.Empty; + + public DateTime? IssuedTime { get; set; } + public DateTime? ReportTime { get; set; } + + /// + /// 报文类型(是否需要回复) + /// + public PacketType PacketType { get; set; } + + public Guid Id { get; set; } + } + + public enum PacketType + { + /// + /// 只有下发,不需要回复 + /// + OnlyIssued, + /// + /// 只有上报,不需要下发 + /// + OnlyReport, + /// + /// 下发并且需要回复 + /// + IssuedAndReport + } +} diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs index c193535..630b717 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs @@ -16,6 +16,7 @@ namespace JiShe.CollectBus.IotSystems.Protocols /// public ProtocolInfo(string name, string baseProtocol, string type, string description, string regularExpression) { + Code = $"PL-{DateTime.Now:yyyyMMddHHmmss}"; Name = name; Type = type; Description = description; @@ -23,6 +24,11 @@ namespace JiShe.CollectBus.IotSystems.Protocols BaseProtocol = baseProtocol; } + /// + /// 协议编码,唯一识别 + /// + public string Code { get; set; } + /// /// 协议名称 /// diff --git a/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj b/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj index a4dbc5d..a60ffe1 100644 --- a/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj +++ b/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj @@ -9,9 +9,9 @@ - - - + + + @@ -34,9 +34,4 @@ - - - - - diff --git a/shared/JiShe.CollectBus.Common/Consts/ExceptionCode.cs b/shared/JiShe.CollectBus.Common/Consts/ExceptionCode.cs new file mode 100644 index 0000000..8fd63ad --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Consts/ExceptionCode.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Consts +{ + public class ExceptionCode + { + public const string NotFound = "500404"; + } +} diff --git a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs index 7ac170b..53ad4f6 100644 --- a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs @@ -72,6 +72,11 @@ namespace JiShe.CollectBus.Common.Consts ///// //public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap"; - public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey"; + public const string CacheAmmeterFocusKey = $"{CacheBasicDirectoryKey}CacheAmmeterFocusKey"; + + /// + /// 协议池缓存标识 + /// + public const string ProtocolKey = $"{CacheBasicDirectoryKey}Protocols"; } } diff --git a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs index 94e7421..0c77e37 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs @@ -49,7 +49,7 @@ namespace JiShe.CollectBus.Common.Extensions ///
/// [Description("正则匹配单个")] - public static string RegexMatch(this string str, string pattern) + public static string? RegexMatch(this string str, string pattern) { var reg = new Regex(pattern); var match = reg.Match(str); diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs index 95e93c7..b038907 100644 --- a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -13,6 +13,7 @@ using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.Caching.StackExchangeRedis; using Volo.Abp.Modularity; using Volo.Abp.Swashbuckle; +using Volo.Abp.Timing; namespace JiShe.CollectBus.Host { @@ -24,6 +25,7 @@ namespace JiShe.CollectBus.Host typeof(AbpAspNetCoreAuthenticationJwtBearerModule), typeof(AbpAspNetCoreSerilogModule), typeof(AbpSwashbuckleModule), + typeof(AbpTimingModule), typeof(CollectBusApplicationModule), typeof(CollectBusMongoDbModule), typeof(AbpCachingStackExchangeRedisModule), @@ -46,6 +48,7 @@ namespace JiShe.CollectBus.Host //ConfigureKafkaTopic(context, configuration); ConfigureAuditLog(context); ConfigureCustom(context, configuration); + Configure(options => { options.Kind = DateTimeKind.Local; }); } diff --git a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj index 3e60600..86f49a6 100644 --- a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj +++ b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj @@ -64,6 +64,9 @@ Always + + Always +