diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..fe1152b
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,30 @@
+**/.classpath
+**/.dockerignore
+**/.env
+**/.git
+**/.gitignore
+**/.project
+**/.settings
+**/.toolstarget
+**/.vs
+**/.vscode
+**/*.*proj.user
+**/*.dbmdl
+**/*.jfm
+**/azds.yaml
+**/bin
+**/charts
+**/docker-compose*
+**/Dockerfile*
+**/node_modules
+**/npm-debug.log
+**/obj
+**/secrets.dev.yaml
+**/values.dev.yaml
+LICENSE
+README.md
+!**/.gitignore
+!.git/HEAD
+!.git/config
+!.git/packed-refs
+!.git/refs/heads/**
\ No newline at end of file
diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index fa3fd6c..6bc2bee 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -49,6 +49,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EB
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "6.External", "6.External", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.PluginFileWatcher", "external\JiShe.CollectBus.PluginFileWatcher\JiShe.CollectBus.PluginFileWatcher.csproj", "{F767D1C3-6807-4AE3-996A-3A28FE8124C2}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -127,6 +131,10 @@ Global
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -150,6 +158,7 @@ Global
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
+ {F767D1C3-6807-4AE3-996A-3A28FE8124C2} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Config.cs b/external/JiShe.CollectBus.PluginFileWatcher/Config.cs
new file mode 100644
index 0000000..8de55f6
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/Config.cs
@@ -0,0 +1,287 @@
+using System;
+using System.Collections.Generic;
+
+namespace JiShe.CollectBus.PluginFileWatcher
+{
+ ///
+ /// 文件监控程序的配置类
+ ///
+ public class FileMonitorConfig
+ {
+ ///
+ /// 基本配置
+ ///
+ public GeneralConfig General { get; set; } = new GeneralConfig();
+
+ ///
+ /// 文件过滤配置
+ ///
+ public FileFiltersConfig FileFilters { get; set; } = new FileFiltersConfig();
+
+ ///
+ /// 性能相关配置
+ ///
+ public PerformanceConfig Performance { get; set; } = new PerformanceConfig();
+
+ ///
+ /// 健壮性相关配置
+ ///
+ public RobustnessConfig Robustness { get; set; } = new RobustnessConfig();
+
+ ///
+ /// 事件存储和回放配置
+ ///
+ public EventStorageConfig EventStorage { get; set; } = new EventStorageConfig();
+
+ ///
+ /// 文件系统通知过滤器配置
+ ///
+ public List NotifyFilters { get; set; } = new List();
+
+ ///
+ /// 日志配置
+ ///
+ public LoggingConfig Logging { get; set; } = new LoggingConfig();
+ }
+
+ ///
+ /// 常规配置
+ ///
+ public class GeneralConfig
+ {
+ ///
+ /// 是否启用文件过滤
+ ///
+ public bool EnableFileFiltering { get; set; } = true;
+
+ ///
+ /// 内存监控间隔(分钟)
+ ///
+ public int MemoryMonitorIntervalMinutes { get; set; } = 1;
+
+ ///
+ /// 默认监控路径
+ ///
+ public string DefaultMonitorPath { get; set; } = string.Empty;
+ }
+
+ ///
+ /// 文件过滤配置
+ ///
+ public class FileFiltersConfig
+ {
+ ///
+ /// 允许监控的文件扩展名
+ ///
+ public string[] AllowedExtensions { get; set; } = new[] { ".dll" };
+
+ ///
+ /// 排除的目录
+ ///
+ public string[] ExcludedDirectories { get; set; } = new[] { "bin", "obj", "node_modules" };
+
+ ///
+ /// 是否包含子目录
+ ///
+ public bool IncludeSubdirectories { get; set; } = true;
+ }
+
+ ///
+ /// 性能相关配置
+ ///
+ public class PerformanceConfig
+ {
+ ///
+ /// 内存清理阈值(事件数)
+ ///
+ public int MemoryCleanupThreshold { get; set; } = 5000;
+
+ ///
+ /// 通道容量
+ ///
+ public int ChannelCapacity { get; set; } = 1000;
+
+ ///
+ /// 事件去抖时间(秒)
+ ///
+ public int EventDebounceTimeSeconds { get; set; } = 3;
+
+ ///
+ /// 最大字典大小
+ ///
+ public int MaxDictionarySize { get; set; } = 10000;
+
+ ///
+ /// 清理间隔(秒)
+ ///
+ public int CleanupIntervalSeconds { get; set; } = 5;
+
+ ///
+ /// 处理延迟(毫秒)
+ ///
+ public int ProcessingDelayMs { get; set; } = 5;
+ }
+
+ ///
+ /// 健壮性相关配置
+ ///
+ public class RobustnessConfig
+ {
+ ///
+ /// 是否启用自动恢复机制
+ ///
+ public bool EnableAutoRecovery { get; set; } = true;
+
+ ///
+ /// 监控器健康检查间隔(秒)
+ ///
+ public int WatcherHealthCheckIntervalSeconds { get; set; } = 30;
+
+ ///
+ /// 监控器无响应超时时间(秒)
+ ///
+ public int WatcherTimeoutSeconds { get; set; } = 60;
+
+ ///
+ /// 监控器重启尝试最大次数
+ ///
+ public int MaxRestartAttempts { get; set; } = 3;
+
+ ///
+ /// 重启尝试之间的延迟(秒)
+ ///
+ public int RestartDelaySeconds { get; set; } = 5;
+
+ ///
+ /// 是否启用文件锁检测
+ ///
+ public bool EnableFileLockDetection { get; set; } = true;
+
+ ///
+ /// 对锁定文件的处理策略: Skip(跳过), Retry(重试), Log(仅记录)
+ ///
+ public string LockedFileStrategy { get; set; } = "Retry";
+
+ ///
+ /// 文件锁定重试次数
+ ///
+ public int FileLockRetryCount { get; set; } = 3;
+
+ ///
+ /// 文件锁定重试间隔(毫秒)
+ ///
+ public int FileLockRetryDelayMs { get; set; } = 500;
+ }
+
+ ///
+ /// 事件存储和回放配置
+ ///
+ public class EventStorageConfig
+ {
+ ///
+ /// 是否启用事件存储
+ ///
+ public bool EnableEventStorage { get; set; } = true;
+
+ ///
+ /// 存储类型:SQLite 或 File
+ ///
+ public string StorageType { get; set; } = "SQLite";
+
+ ///
+ /// 事件存储目录
+ ///
+ public string StorageDirectory { get; set; } = "D:/EventLogs";
+
+ ///
+ /// SQLite数据库文件路径
+ ///
+ public string DatabasePath { get; set; } = "D:/EventLogs/events.db";
+
+ ///
+ /// SQLite连接字符串
+ ///
+ public string ConnectionString { get; set; } = "Data Source=D:/EventLogs/events.db";
+
+ ///
+ /// 数据库命令超时(秒)
+ ///
+ public int CommandTimeout { get; set; } = 30;
+
+ ///
+ /// 事件日志文件名格式 (使用DateTime.ToString格式)
+ ///
+ public string LogFileNameFormat { get; set; } = "FileEvents_{0:yyyy-MM-dd}.json";
+
+ ///
+ /// 存储间隔(秒),多久将缓存的事件写入一次存储
+ ///
+ public int StorageIntervalSeconds { get; set; } = 60;
+
+ ///
+ /// 事件批量写入大小,达到此数量时立即写入存储
+ ///
+ public int BatchSize { get; set; } = 100;
+
+ ///
+ /// 最大保留事件记录条数
+ ///
+ public int MaxEventRecords { get; set; } = 100000;
+
+ ///
+ /// 数据保留天数
+ ///
+ public int DataRetentionDays { get; set; } = 30;
+
+ ///
+ /// 最大保留日志文件数
+ ///
+ public int MaxLogFiles { get; set; } = 30;
+
+ ///
+ /// 是否压缩存储的事件日志
+ ///
+ public bool CompressLogFiles { get; set; } = true;
+
+ ///
+ /// 是否可以回放事件
+ ///
+ public bool EnableEventReplay { get; set; } = true;
+
+ ///
+ /// 回放时间间隔(毫秒)
+ ///
+ public int ReplayIntervalMs { get; set; } = 100;
+
+ ///
+ /// 回放速度倍率,大于1加速,小于1减速
+ ///
+ public double ReplaySpeedFactor { get; set; } = 1.0;
+ }
+
+ ///
+ /// 日志相关配置
+ ///
+ public class LoggingConfig
+ {
+ ///
+ /// 日志级别:Verbose、Debug、Information、Warning、Error、Fatal
+ ///
+ public string LogLevel { get; set; } = "Information";
+
+ ///
+ /// 是否记录文件事件处理详情
+ ///
+ public bool LogFileEventDetails { get; set; } = false;
+
+ ///
+ /// 日志文件保留天数
+ ///
+ public int RetainedLogDays { get; set; } = 30;
+
+ ///
+ /// 日志文件目录
+ ///
+ public string LogDirectory { get; set; } = "Logs";
+ }
+}
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/DbUtility.cs b/external/JiShe.CollectBus.PluginFileWatcher/DbUtility.cs
new file mode 100644
index 0000000..c2dd7a5
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/DbUtility.cs
@@ -0,0 +1,277 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Logging;
+using Serilog;
+using Serilog.Extensions.Logging;
+using ILogger = Microsoft.Extensions.Logging.ILogger;
+
+namespace JiShe.CollectBus.PluginFileWatcher
+{
+ ///
+ /// 数据库操作工具类,用于命令行测试数据库功能
+ ///
+ public class DbUtility
+ {
+ private readonly EventDatabaseManager _dbManager;
+ private readonly ILogger _logger;
+ private readonly FileMonitorConfig _config;
+
+ ///
+ /// 初始化数据库工具类
+ ///
+ /// 配置文件路径
+ public DbUtility(string configPath = "appsettings.json")
+ {
+ // 从配置文件加载配置
+ var configuration = new ConfigurationBuilder()
+ .AddJsonFile(configPath, optional: false, reloadOnChange: true)
+ .Build();
+
+ // 初始化日志
+ var serilogLogger = new LoggerConfiguration()
+ .ReadFrom.Configuration(configuration)
+ .CreateLogger();
+
+ // 将Serilog适配为Microsoft.Extensions.Logging.ILogger
+ _logger = new SerilogLoggerFactory(serilogLogger).CreateLogger("DbUtility");
+
+ // 创建配置对象
+ _config = new FileMonitorConfig();
+ configuration.GetSection("FileMonitor").Bind(_config);
+
+ // 确保SQLite存储已启用
+ _config.EventStorage.EnableEventStorage = true;
+ _config.EventStorage.StorageType = "SQLite";
+
+ // 创建数据库管理器
+ _dbManager = new EventDatabaseManager(_config, _logger);
+ }
+
+ ///
+ /// 执行数据库维护操作
+ ///
+ /// 命令行参数
+ public async Task ExecuteAsync(string[] args)
+ {
+ if (args.Length == 0)
+ {
+ PrintUsage();
+ return;
+ }
+
+ string command = args[0].ToLower();
+
+ switch (command)
+ {
+ case "stats":
+ await ShowStatisticsAsync();
+ break;
+
+ case "cleanup":
+ int days = args.Length > 1 && int.TryParse(args[1], out int d) ? d : _config.EventStorage.DataRetentionDays;
+ await _dbManager.CleanupOldDataAsync(days);
+ Console.WriteLine($"已清理 {days} 天前的旧数据");
+ break;
+
+ case "query":
+ await QueryEventsAsync(args);
+ break;
+
+ case "test":
+ await GenerateTestDataAsync(args);
+ break;
+
+ default:
+ Console.WriteLine($"未知命令: {command}");
+ PrintUsage();
+ break;
+ }
+ }
+
+ ///
+ /// 显示帮助信息
+ ///
+ private void PrintUsage()
+ {
+ Console.WriteLine("数据库工具使用方法:");
+ Console.WriteLine(" stats - 显示数据库统计信息");
+ Console.WriteLine(" cleanup [days] - 清理指定天数之前的数据(默认使用配置值)");
+ Console.WriteLine(" query [limit] - 查询最近的事件(默认10条)");
+ Console.WriteLine(" query type:X ext:Y - 按类型和扩展名查询事件");
+ Console.WriteLine(" test [count] - 生成测试数据(默认100条)");
+ }
+
+ ///
+ /// 显示数据库统计信息
+ ///
+ private async Task ShowStatisticsAsync()
+ {
+ try
+ {
+ var stats = await _dbManager.GetDatabaseStatsAsync();
+
+ Console.WriteLine("数据库统计信息:");
+ Console.WriteLine($"事件总数: {stats.TotalEvents}");
+ Console.WriteLine($"最早事件时间: {stats.OldestEventTime?.ToLocalTime()}");
+ Console.WriteLine($"最新事件时间: {stats.NewestEventTime?.ToLocalTime()}");
+
+ Console.WriteLine("\n事件类型分布:");
+ if (stats.EventTypeCounts != null)
+ {
+ foreach (var item in stats.EventTypeCounts)
+ {
+ Console.WriteLine($" {item.Key}: {item.Value}");
+ }
+ }
+
+ Console.WriteLine("\n扩展名分布 (Top 10):");
+ if (stats.TopExtensions != null)
+ {
+ foreach (var item in stats.TopExtensions)
+ {
+ Console.WriteLine($" {item.Key}: {item.Value}");
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"获取统计信息出错: {ex.Message}");
+ _logger.LogError(ex, "获取数据库统计信息时发生错误");
+ }
+ }
+
+ ///
+ /// 查询事件
+ ///
+ private async Task QueryEventsAsync(string[] args)
+ {
+ try
+ {
+ var queryParams = new EventQueryParams
+ {
+ PageSize = 10,
+ PageIndex = 0
+ };
+
+ // 解析查询参数
+ if (args.Length > 1)
+ {
+ foreach (var arg in args.Skip(1))
+ {
+ if (int.TryParse(arg, out int limit))
+ {
+ queryParams.PageSize = limit;
+ }
+ else if (arg.StartsWith("type:", StringComparison.OrdinalIgnoreCase))
+ {
+ string typeValue = arg.Substring(5);
+ if (Enum.TryParse(typeValue, true, out var eventType))
+ {
+ queryParams.EventType = eventType;
+ }
+ }
+ else if (arg.StartsWith("ext:", StringComparison.OrdinalIgnoreCase))
+ {
+ queryParams.ExtensionFilter = arg.Substring(4);
+ if (!queryParams.ExtensionFilter.StartsWith("."))
+ {
+ queryParams.ExtensionFilter = "." + queryParams.ExtensionFilter;
+ }
+ }
+ else if (arg.StartsWith("path:", StringComparison.OrdinalIgnoreCase))
+ {
+ queryParams.PathFilter = arg.Substring(5);
+ }
+ }
+ }
+
+ // 执行查询
+ var result = await _dbManager.QueryEventsAsync(queryParams);
+
+ Console.WriteLine($"查询结果 (总数: {result.TotalCount}):");
+ foreach (var evt in result.Events)
+ {
+ string typeStr = evt.EventType.ToString();
+ string timestamp = evt.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss");
+ Console.WriteLine($"[{timestamp}] {typeStr,-10} {evt.FileName} ({evt.Extension})");
+ }
+
+ if (result.HasMore)
+ {
+ Console.WriteLine($"... 还有更多结果,共 {result.TotalCount} 条");
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"查询事件出错: {ex.Message}");
+ _logger.LogError(ex, "查询事件时发生错误");
+ }
+ }
+
+ ///
+ /// 生成测试数据
+ ///
+ private async Task GenerateTestDataAsync(string[] args)
+ {
+ try
+ {
+ int count = args.Length > 1 && int.TryParse(args[1], out int c) ? c : 100;
+
+ var events = new List();
+ var rnd = new Random();
+ string[] extensions = { ".dll", ".exe", ".txt", ".cs", ".xml", ".json", ".png", ".jpg" };
+ string[] directories = { "C:\\Temp", "D:\\Work", "C:\\Program Files", "D:\\Projects", "E:\\Data" };
+
+ DateTime startTime = DateTime.Now.AddHours(-24);
+
+ for (int i = 0; i < count; i++)
+ {
+ var eventType = (FileEventType)rnd.Next(0, 5);
+ var ext = extensions[rnd.Next(extensions.Length)];
+ var dir = directories[rnd.Next(directories.Length)];
+ var fileName = $"TestFile_{i:D5}{ext}";
+ var timestamp = startTime.AddMinutes(i);
+
+ var fileEvent = new FileEvent
+ {
+ Id = Guid.NewGuid(),
+ Timestamp = timestamp,
+ EventType = eventType,
+ FullPath = $"{dir}\\{fileName}",
+ FileName = fileName,
+ Directory = dir,
+ Extension = ext,
+ FileSize = rnd.Next(1024, 1024 * 1024)
+ };
+
+ // 如果是重命名事件,添加旧文件名
+ if (eventType == FileEventType.Renamed)
+ {
+ fileEvent.OldFileName = $"OldFile_{i:D5}{ext}";
+ fileEvent.OldFullPath = $"{dir}\\{fileEvent.OldFileName}";
+ }
+
+ // 添加一些元数据
+ fileEvent.Metadata["CreationTime"] = timestamp.AddMinutes(-rnd.Next(1, 60)).ToString("o");
+ fileEvent.Metadata["LastWriteTime"] = timestamp.ToString("o");
+ fileEvent.Metadata["IsReadOnly"] = (rnd.Next(10) < 2).ToString();
+ fileEvent.Metadata["TestData"] = $"测试数据 {i}";
+
+ events.Add(fileEvent);
+ }
+
+ Console.WriteLine($"正在生成 {count} 条测试数据...");
+ await _dbManager.SaveEventsAsync(events);
+ Console.WriteLine("测试数据生成完成!");
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"生成测试数据出错: {ex.Message}");
+ _logger.LogError(ex, "生成测试数据时发生错误");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Dockerfile b/external/JiShe.CollectBus.PluginFileWatcher/Dockerfile
new file mode 100644
index 0000000..88a0552
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/Dockerfile
@@ -0,0 +1,28 @@
+# 请参阅 https://aka.ms/customizecontainer 以了解如何自定义调试容器,以及 Visual Studio 如何使用此 Dockerfile 生成映像以更快地进行调试。
+
+# 此阶段用于在快速模式(默认为调试配置)下从 VS 运行时
+FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base
+USER $APP_UID
+WORKDIR /app
+
+
+# 此阶段用于生成服务项目
+FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
+ARG BUILD_CONFIGURATION=Release
+WORKDIR /src
+COPY ["external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj", "external/JiShe.CollectBus.PluginFileWatcher/"]
+RUN dotnet restore "./external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj"
+COPY . .
+WORKDIR "/src/external/JiShe.CollectBus.PluginFileWatcher"
+RUN dotnet build "./JiShe.CollectBus.PluginFileWatcher.csproj" -c $BUILD_CONFIGURATION -o /app/build
+
+# 此阶段用于发布要复制到最终阶段的服务项目
+FROM build AS publish
+ARG BUILD_CONFIGURATION=Release
+RUN dotnet publish "./JiShe.CollectBus.PluginFileWatcher.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
+
+# 此阶段在生产中使用,或在常规模式下从 VS 运行时使用(在不使用调试配置时为默认值)
+FROM base AS final
+WORKDIR /app
+COPY --from=publish /app/publish .
+ENTRYPOINT ["dotnet", "JiShe.CollectBus.PluginFileWatcher.dll"]
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/EventDatabaseManager.cs b/external/JiShe.CollectBus.PluginFileWatcher/EventDatabaseManager.cs
new file mode 100644
index 0000000..9b0f257
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/EventDatabaseManager.cs
@@ -0,0 +1,481 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using Dapper;
+using Microsoft.Data.Sqlite;
+using Microsoft.Extensions.Logging;
+
+namespace JiShe.CollectBus.PluginFileWatcher
+{
+ ///
+ /// SQLite数据库管理器,用于管理文件事件的存储和检索
+ ///
+ public class EventDatabaseManager : IDisposable
+ {
+ private readonly FileMonitorConfig _config;
+ private readonly ILogger _logger;
+ private readonly string _connectionString;
+ private readonly string _databasePath;
+ private readonly int _commandTimeout;
+ private bool _disposed;
+
+ ///
+ /// 初始化数据库管理器
+ ///
+ /// 配置对象
+ /// 日志记录器
+ public EventDatabaseManager(FileMonitorConfig config, ILogger logger)
+ {
+ _config = config ?? throw new ArgumentNullException(nameof(config));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+
+ // 确保使用配置中的设置
+ _databasePath = config.EventStorage.DatabasePath;
+ _connectionString = config.EventStorage.ConnectionString;
+ _commandTimeout = config.EventStorage.CommandTimeout;
+
+ // 确保数据库目录存在
+ string dbDirectory = Path.GetDirectoryName(_databasePath);
+ if (!string.IsNullOrEmpty(dbDirectory) && !Directory.Exists(dbDirectory))
+ {
+ Directory.CreateDirectory(dbDirectory);
+ }
+
+ // 初始化数据库
+ InitializeDatabase().GetAwaiter().GetResult();
+ }
+
+ ///
+ /// 初始化数据库,确保必要的表已创建
+ ///
+ private async Task InitializeDatabase()
+ {
+ try
+ {
+ using var connection = new SqliteConnection(_connectionString);
+ await connection.OpenAsync();
+
+ // 启用外键约束
+ using (var command = connection.CreateCommand())
+ {
+ command.CommandText = "PRAGMA foreign_keys = ON;";
+ await command.ExecuteNonQueryAsync();
+ }
+
+ // 创建文件事件表
+ string createTableSql = @"
+ CREATE TABLE IF NOT EXISTS FileEvents (
+ Id TEXT PRIMARY KEY,
+ Timestamp TEXT NOT NULL,
+ EventType INTEGER NOT NULL,
+ FullPath TEXT NOT NULL,
+ FileName TEXT NOT NULL,
+ Directory TEXT NOT NULL,
+ Extension TEXT NOT NULL,
+ OldFileName TEXT,
+ OldFullPath TEXT,
+ FileSize INTEGER,
+ CreatedAt TEXT NOT NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_events_timestamp ON FileEvents(Timestamp);
+ CREATE INDEX IF NOT EXISTS idx_events_eventtype ON FileEvents(EventType);
+ CREATE INDEX IF NOT EXISTS idx_events_extension ON FileEvents(Extension);";
+
+ await connection.ExecuteAsync(createTableSql, commandTimeout: _commandTimeout);
+
+ // 创建元数据表
+ string createMetadataTableSql = @"
+ CREATE TABLE IF NOT EXISTS EventMetadata (
+ Id INTEGER PRIMARY KEY AUTOINCREMENT,
+ EventId TEXT NOT NULL,
+ MetadataKey TEXT NOT NULL,
+ MetadataValue TEXT,
+ FOREIGN KEY (EventId) REFERENCES FileEvents(Id) ON DELETE CASCADE
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_metadata_eventid ON EventMetadata(EventId);
+ CREATE INDEX IF NOT EXISTS idx_metadata_key ON EventMetadata(MetadataKey);";
+
+ await connection.ExecuteAsync(createMetadataTableSql, commandTimeout: _commandTimeout);
+
+ _logger.LogInformation("数据库初始化成功");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "初始化数据库失败");
+ throw;
+ }
+ }
+
+ ///
+ /// 保存文件事件到数据库
+ ///
+ /// 要保存的事件列表
+ public async Task SaveEventsAsync(List events)
+ {
+ if (events == null || events.Count == 0)
+ return;
+
+ try
+ {
+ using var connection = new SqliteConnection(_connectionString);
+ await connection.OpenAsync();
+
+ // 启用外键约束
+ using (var command = connection.CreateCommand())
+ {
+ command.CommandText = "PRAGMA foreign_keys = ON;";
+ await command.ExecuteNonQueryAsync();
+ }
+
+ // 开始事务
+ using var transaction = connection.BeginTransaction();
+
+ try
+ {
+ foreach (var fileEvent in events)
+ {
+ // 插入事件数据
+ string insertEventSql = @"
+ INSERT INTO FileEvents (
+ Id, Timestamp, EventType, FullPath, FileName,
+ Directory, Extension, OldFileName, OldFullPath,
+ FileSize, CreatedAt
+ ) VALUES (
+ @Id, @Timestamp, @EventType, @FullPath, @FileName,
+ @Directory, @Extension, @OldFileName, @OldFullPath,
+ @FileSize, @CreatedAt
+ )";
+
+ await connection.ExecuteAsync(insertEventSql, new
+ {
+ Id = fileEvent.Id.ToString(), // 确保ID始终以字符串形式保存
+ Timestamp = fileEvent.Timestamp.ToString("o"),
+ EventType = (int)fileEvent.EventType,
+ fileEvent.FullPath,
+ fileEvent.FileName,
+ fileEvent.Directory,
+ fileEvent.Extension,
+ fileEvent.OldFileName,
+ fileEvent.OldFullPath,
+ fileEvent.FileSize,
+ CreatedAt = DateTime.UtcNow.ToString("o")
+ }, transaction, _commandTimeout);
+
+ // 插入元数据
+ if (fileEvent.Metadata != null && fileEvent.Metadata.Count > 0)
+ {
+ string insertMetadataSql = @"
+ INSERT INTO EventMetadata (EventId, MetadataKey, MetadataValue)
+ VALUES (@EventId, @MetadataKey, @MetadataValue)";
+
+ foreach (var metadata in fileEvent.Metadata)
+ {
+ await connection.ExecuteAsync(insertMetadataSql, new
+ {
+ EventId = fileEvent.Id.ToString(), // 确保ID以相同格式保存
+ MetadataKey = metadata.Key,
+ MetadataValue = metadata.Value
+ }, transaction, _commandTimeout);
+ }
+ }
+ }
+
+ // 提交事务
+ transaction.Commit();
+ _logger.LogInformation($"已成功保存 {events.Count} 个事件到数据库");
+ }
+ catch (Exception ex)
+ {
+ // 回滚事务
+ transaction.Rollback();
+ _logger.LogError(ex, "保存事件到数据库时发生错误");
+ throw;
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "连接数据库失败");
+ throw;
+ }
+ }
+
+ ///
+ /// 查询事件
+ ///
+ /// 查询参数
+ /// 查询结果
+ public async Task QueryEventsAsync(EventQueryParams queryParams)
+ {
+ if (queryParams == null)
+ throw new ArgumentNullException(nameof(queryParams));
+
+ var result = new EventQueryResult
+ {
+ StartTime = queryParams.StartTime ?? DateTime.MinValue,
+ EndTime = queryParams.EndTime ?? DateTime.MaxValue
+ };
+
+ try
+ {
+ using var connection = new SqliteConnection(_connectionString);
+ await connection.OpenAsync();
+
+ // 启用外键约束
+ using (var command = connection.CreateCommand())
+ {
+ command.CommandText = "PRAGMA foreign_keys = ON;";
+ await command.ExecuteNonQueryAsync();
+ }
+
+ // 构建查询条件
+ var conditions = new List();
+ var parameters = new DynamicParameters();
+
+ if (queryParams.StartTime.HasValue)
+ {
+ conditions.Add("Timestamp >= @StartTime");
+ parameters.Add("@StartTime", queryParams.StartTime.Value.ToString("o"));
+ }
+
+ if (queryParams.EndTime.HasValue)
+ {
+ conditions.Add("Timestamp <= @EndTime");
+ parameters.Add("@EndTime", queryParams.EndTime.Value.ToString("o"));
+ }
+
+ if (queryParams.EventType.HasValue)
+ {
+ conditions.Add("EventType = @EventType");
+ parameters.Add("@EventType", (int)queryParams.EventType.Value);
+ }
+
+ if (!string.IsNullOrEmpty(queryParams.PathFilter))
+ {
+ conditions.Add("FullPath LIKE @PathFilter");
+ parameters.Add("@PathFilter", $"%{queryParams.PathFilter}%");
+ }
+
+ if (!string.IsNullOrEmpty(queryParams.ExtensionFilter))
+ {
+ conditions.Add("Extension = @ExtensionFilter");
+ parameters.Add("@ExtensionFilter", queryParams.ExtensionFilter);
+ }
+
+ // 构建WHERE子句
+ string whereClause = conditions.Count > 0
+ ? $"WHERE {string.Join(" AND ", conditions)}"
+ : string.Empty;
+
+ // 构建ORDER BY子句
+ string orderByClause = queryParams.AscendingOrder
+ ? "ORDER BY Timestamp ASC"
+ : "ORDER BY Timestamp DESC";
+
+ // 获取总记录数
+ string countSql = $"SELECT COUNT(*) FROM FileEvents {whereClause}";
+ result.TotalCount = await connection.ExecuteScalarAsync(countSql, parameters, commandTimeout: _commandTimeout);
+
+ // 应用分页
+ string paginationClause = $"LIMIT @PageSize OFFSET @Offset";
+ parameters.Add("@PageSize", queryParams.PageSize);
+ parameters.Add("@Offset", queryParams.PageIndex * queryParams.PageSize);
+
+ // 查询事件数据
+ string querySql = $@"
+ SELECT Id,
+ Timestamp,
+ EventType,
+ FullPath,
+ FileName,
+ Directory,
+ Extension,
+ OldFileName,
+ OldFullPath,
+ FileSize
+ FROM FileEvents
+ {whereClause}
+ {orderByClause}
+ {paginationClause}";
+
+ var events = await connection.QueryAsync(querySql, parameters, commandTimeout: _commandTimeout);
+
+ // 处理查询结果
+ foreach (var eventData in events)
+ {
+ var fileEvent = new FileEvent
+ {
+ Id = Guid.Parse(eventData.Id),
+ Timestamp = DateTime.Parse(eventData.Timestamp),
+ EventType = (FileEventType)eventData.EventType,
+ FullPath = eventData.FullPath,
+ FileName = eventData.FileName,
+ Directory = eventData.Directory,
+ Extension = eventData.Extension,
+ OldFileName = eventData.OldFileName,
+ OldFullPath = eventData.OldFullPath,
+ FileSize = eventData.FileSize
+ };
+
+ // 获取元数据
+ string metadataSql = "SELECT MetadataKey, MetadataValue FROM EventMetadata WHERE EventId = @EventId";
+ var metadata = await connection.QueryAsync(metadataSql, new { EventId = fileEvent.Id.ToString() }, commandTimeout: _commandTimeout);
+
+ foreach (var item in metadata)
+ {
+ fileEvent.Metadata[item.MetadataKey] = item.MetadataValue;
+ }
+
+ result.Events.Add(fileEvent);
+ }
+
+ result.HasMore = (queryParams.PageIndex + 1) * queryParams.PageSize < result.TotalCount;
+
+ return result;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "查询事件时发生错误");
+ throw;
+ }
+ }
+
+ ///
+ /// 清理旧数据
+ ///
+ /// 数据保留天数
+ public async Task CleanupOldDataAsync(int retentionDays)
+ {
+ if (retentionDays <= 0)
+ return;
+
+ try
+ {
+ DateTime cutoffDate = DateTime.UtcNow.AddDays(-retentionDays);
+ string cutoffDateStr = cutoffDate.ToString("o");
+
+ using var connection = new SqliteConnection(_connectionString);
+ await connection.OpenAsync();
+
+ // 启用外键约束
+ using (var command = connection.CreateCommand())
+ {
+ command.CommandText = "PRAGMA foreign_keys = ON;";
+ await command.ExecuteNonQueryAsync();
+ }
+
+ // 删除旧事件(级联删除元数据)
+ string deleteSql = "DELETE FROM FileEvents WHERE Timestamp < @CutoffDate";
+ int deletedCount = await connection.ExecuteAsync(deleteSql, new { CutoffDate = cutoffDateStr }, commandTimeout: _commandTimeout);
+
+ _logger.LogInformation($"已清理 {deletedCount} 条旧事件数据({retentionDays}天前)");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "清理旧数据时发生错误");
+ throw;
+ }
+ }
+
+ ///
+ /// 获取数据库统计信息
+ ///
+ /// 数据库统计信息
+ public async Task GetDatabaseStatsAsync()
+ {
+ try
+ {
+ using var connection = new SqliteConnection(_connectionString);
+ await connection.OpenAsync();
+
+ // 启用外键约束
+ using (var command = connection.CreateCommand())
+ {
+ command.CommandText = "PRAGMA foreign_keys = ON;";
+ await command.ExecuteNonQueryAsync();
+ }
+
+ var stats = new DatabaseStats();
+
+ // 获取事件总数
+ stats.TotalEvents = await connection.ExecuteScalarAsync("SELECT COUNT(*) FROM FileEvents", commandTimeout: _commandTimeout);
+
+ // 获取最早和最新事件时间
+ stats.OldestEventTime = await connection.ExecuteScalarAsync("SELECT Timestamp FROM FileEvents ORDER BY Timestamp ASC LIMIT 1", commandTimeout: _commandTimeout);
+ stats.NewestEventTime = await connection.ExecuteScalarAsync("SELECT Timestamp FROM FileEvents ORDER BY Timestamp DESC LIMIT 1", commandTimeout: _commandTimeout);
+
+ // 获取事件类型分布
+ var eventTypeCounts = await connection.QueryAsync("SELECT EventType, COUNT(*) AS Count FROM FileEvents GROUP BY EventType", commandTimeout: _commandTimeout);
+ stats.EventTypeCounts = new Dictionary();
+
+ foreach (var item in eventTypeCounts)
+ {
+ stats.EventTypeCounts[(FileEventType)item.EventType] = item.Count;
+ }
+
+ // 获取扩展名分布(前10个)
+ var extensionCounts = await connection.QueryAsync(
+ "SELECT Extension, COUNT(*) AS Count FROM FileEvents GROUP BY Extension ORDER BY Count DESC LIMIT 10",
+ commandTimeout: _commandTimeout);
+ stats.TopExtensions = new Dictionary();
+
+ foreach (var item in extensionCounts)
+ {
+ stats.TopExtensions[item.Extension] = item.Count;
+ }
+
+ return stats;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "获取数据库统计信息时发生错误");
+ throw;
+ }
+ }
+
+ ///
+ /// 释放资源
+ ///
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ _disposed = true;
+ }
+ }
+
+ ///
+ /// 数据库统计信息
+ ///
+ public class DatabaseStats
+ {
+ ///
+ /// 事件总数
+ ///
+ public int TotalEvents { get; set; }
+
+ ///
+ /// 最早事件时间
+ ///
+ public DateTime? OldestEventTime { get; set; }
+
+ ///
+ /// 最新事件时间
+ ///
+ public DateTime? NewestEventTime { get; set; }
+
+ ///
+ /// 事件类型计数
+ ///
+ public Dictionary EventTypeCounts { get; set; }
+
+ ///
+ /// 排名前列的文件扩展名
+ ///
+ public Dictionary TopExtensions { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/EventStorage.cs b/external/JiShe.CollectBus.PluginFileWatcher/EventStorage.cs
new file mode 100644
index 0000000..9bbf619
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/EventStorage.cs
@@ -0,0 +1,745 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace JiShe.CollectBus.PluginFileWatcher
+{
+ ///
+ /// 负责文件事件的存储、查询和回放
+ ///
+ public class EventStorage : IDisposable
+ {
+ private readonly FileMonitorConfig _config;
+ private readonly ILogger _logger;
+ private readonly ConcurrentQueue _eventQueue;
+ private readonly Timer _storageTimer;
+ private readonly SemaphoreSlim _storageLock = new SemaphoreSlim(1, 1);
+ private readonly string _storageDirectory;
+ private readonly EventDatabaseManager _dbManager;
+ private bool _disposed;
+
+ ///
+ /// 创建新的事件存储管理器实例
+ ///
+ /// 文件监控配置
+ /// 日志记录器
+ public EventStorage(FileMonitorConfig config, ILogger logger)
+ {
+ _config = config ?? throw new ArgumentNullException(nameof(config));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _eventQueue = new ConcurrentQueue();
+
+ // 确保存储目录存在
+ _storageDirectory = !string.IsNullOrEmpty(_config.EventStorage.StorageDirectory)
+ ? _config.EventStorage.StorageDirectory
+ : Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "EventLogs");
+
+ if (!Directory.Exists(_storageDirectory))
+ {
+ Directory.CreateDirectory(_storageDirectory);
+ }
+
+ // 创建数据库管理器(如果配置为SQLite存储类型)
+ if (config.EventStorage.EnableEventStorage &&
+ config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase))
+ {
+ _dbManager = new EventDatabaseManager(config, logger);
+ _logger.LogInformation("已初始化SQLite事件存储");
+ }
+
+ // 初始化存储定时器(如果启用)
+ if (_config.EventStorage.EnableEventStorage)
+ {
+ var intervalMs = _config.EventStorage.StorageIntervalSeconds * 1000;
+ _storageTimer = new Timer(SaveEventsTimerCallback, null, intervalMs, intervalMs);
+ _logger.LogInformation($"事件存储已初始化,存储目录:{_storageDirectory},存储间隔:{_config.EventStorage.StorageIntervalSeconds}秒");
+ }
+ }
+
+ ///
+ /// 记录一个文件事件
+ ///
+ /// 文件事件
+ public void RecordEvent(FileEvent fileEvent)
+ {
+ if (fileEvent == null || !_config.EventStorage.EnableEventStorage) return;
+
+ _eventQueue.Enqueue(fileEvent);
+ _logger.LogDebug($"文件事件已加入队列:{fileEvent.EventType} - {fileEvent.FullPath}");
+ }
+
+ ///
+ /// 从FileSystemEventArgs记录事件
+ ///
+ /// 文件系统事件参数
+ public void RecordEvent(FileSystemEventArgs e)
+ {
+ if (e == null || !_config.EventStorage.EnableEventStorage) return;
+
+ var fileEvent = FileEvent.FromFileSystemEventArgs(e);
+ RecordEvent(fileEvent);
+ }
+
+ ///
+ /// 定时将事件保存到文件
+ ///
+ private async void SaveEventsTimerCallback(object state)
+ {
+ if (_disposed || _eventQueue.IsEmpty) return;
+
+ try
+ {
+ // 防止多个定时器回调同时执行
+ if (!await _storageLock.WaitAsync(0))
+ {
+ return;
+ }
+
+ try
+ {
+ // 从队列中取出事件
+ List eventsToSave = new List();
+ int batchSize = _config.EventStorage.BatchSize;
+
+ while (eventsToSave.Count < batchSize && !_eventQueue.IsEmpty)
+ {
+ if (_eventQueue.TryDequeue(out var fileEvent))
+ {
+ eventsToSave.Add(fileEvent);
+ }
+ }
+
+ if (eventsToSave.Count > 0)
+ {
+ await SaveEventsToFileAsync(eventsToSave);
+ _logger.LogInformation($"已成功保存 {eventsToSave.Count} 个事件");
+ }
+
+ // 如果有配置,清理旧日志文件
+ if (_config.EventStorage.MaxLogFiles > 0)
+ {
+ await CleanupOldLogFilesAsync();
+ }
+
+ // 如果有配置,清理旧数据库记录
+ if (_dbManager != null && _config.EventStorage.DataRetentionDays > 0)
+ {
+ await _dbManager.CleanupOldDataAsync(_config.EventStorage.DataRetentionDays);
+ }
+ }
+ finally
+ {
+ _storageLock.Release();
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "保存事件时发生错误");
+ }
+ }
+
+ ///
+ /// 将事件保存到文件
+ ///
+ /// 要保存的事件列表
+ private async Task SaveEventsToFileAsync(List events)
+ {
+ if (events == null || events.Count == 0) return;
+
+ try
+ {
+ // 根据存储类型选择保存方式
+ if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
+ {
+ // 保存到SQLite数据库
+ await _dbManager.SaveEventsAsync(events);
+ }
+ else
+ {
+ // 保存到文件
+ string fileName = string.Format(
+ _config.EventStorage.LogFileNameFormat,
+ DateTime.Now);
+
+ string filePath = Path.Combine(_storageDirectory, fileName);
+
+ // 创建事件日志文件对象
+ var logFile = new EventLogFile
+ {
+ CreatedTime = DateTime.UtcNow,
+ Events = events
+ };
+
+ // 序列化为JSON
+ string jsonContent = JsonSerializer.Serialize(logFile, new JsonSerializerOptions
+ {
+ WriteIndented = true
+ });
+
+ // 是否启用压缩
+ if (_config.EventStorage.CompressLogFiles)
+ {
+ string gzFilePath = $"{filePath}.gz";
+ await CompressAndSaveStringAsync(jsonContent, gzFilePath);
+ _logger.LogInformation($"已将事件保存到压缩文件:{gzFilePath}");
+ }
+ else
+ {
+ await File.WriteAllTextAsync(filePath, jsonContent);
+ _logger.LogInformation($"已将事件保存到文件:{filePath}");
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "保存事件到文件时发生错误");
+ throw;
+ }
+ }
+
+ ///
+ /// 压缩并保存字符串到文件
+ ///
+ /// 要保存的内容
+ /// 文件路径
+ private static async Task CompressAndSaveStringAsync(string content, string filePath)
+ {
+ using var fileStream = new FileStream(filePath, FileMode.Create);
+ using var gzipStream = new GZipStream(fileStream, CompressionLevel.Optimal);
+ using var writer = new StreamWriter(gzipStream);
+
+ await writer.WriteAsync(content);
+ }
+
+ ///
+ /// 清理过多的日志文件
+ ///
+ private async Task CleanupOldLogFilesAsync()
+ {
+ try
+ {
+ // 检查是否需要清理
+ if (_config.EventStorage.MaxLogFiles <= 0) return;
+
+ var directory = new DirectoryInfo(_storageDirectory);
+ var logFiles = directory.GetFiles("*.*")
+ .Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
+ .OrderByDescending(f => f.CreationTime)
+ .ToArray();
+
+ // 如果文件数量超过最大值,删除最旧的文件
+ if (logFiles.Length > _config.EventStorage.MaxLogFiles)
+ {
+ int filesToDelete = logFiles.Length - _config.EventStorage.MaxLogFiles;
+ var filesToRemove = logFiles.Skip(logFiles.Length - filesToDelete).ToArray();
+
+ foreach (var file in filesToRemove)
+ {
+ try
+ {
+ file.Delete();
+ _logger.LogInformation($"已删除旧的事件日志文件:{file.Name}");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, $"删除旧日志文件失败:{file.FullName}");
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "清理旧日志文件时发生错误");
+ }
+
+ await Task.CompletedTask;
+ }
+
+ ///
+ /// 查询历史事件
+ ///
+ /// 查询参数
+ /// 查询结果
+ public async Task QueryEventsAsync(EventQueryParams queryParams)
+ {
+ if (queryParams == null) throw new ArgumentNullException(nameof(queryParams));
+
+ // 如果是SQLite存储且数据库管理器可用,使用数据库查询
+ if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
+ {
+ return await _dbManager.QueryEventsAsync(queryParams);
+ }
+
+ var result = new EventQueryResult
+ {
+ StartTime = queryParams.StartTime ?? DateTime.MinValue,
+ EndTime = queryParams.EndTime ?? DateTime.MaxValue
+ };
+
+ try
+ {
+ await _storageLock.WaitAsync();
+
+ try
+ {
+ // 获取所有日志文件
+ var directory = new DirectoryInfo(_storageDirectory);
+ if (!directory.Exists)
+ {
+ return result;
+ }
+
+ var logFiles = directory.GetFiles("*.*")
+ .Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
+ .OrderByDescending(f => f.CreationTime)
+ .ToArray();
+
+ List allEvents = new List();
+
+ // 加载所有日志文件中的事件
+ foreach (var file in logFiles)
+ {
+ try
+ {
+ var events = await LoadEventsFromFileAsync(file.FullName);
+ if (events != null && events.Count > 0)
+ {
+ allEvents.AddRange(events);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, $"从文件加载事件失败:{file.FullName}");
+ }
+ }
+
+ // 内存中队列的事件也包含在查询中
+ FileEvent[] queuedEvents = _eventQueue.ToArray();
+ allEvents.AddRange(queuedEvents);
+
+ // 应用查询过滤条件
+ var filteredEvents = allEvents
+ .Where(e => (queryParams.StartTime == null || e.Timestamp >= queryParams.StartTime) &&
+ (queryParams.EndTime == null || e.Timestamp <= queryParams.EndTime) &&
+ (queryParams.EventType == null || e.EventType == queryParams.EventType.Value) &&
+ (string.IsNullOrEmpty(queryParams.PathFilter) || e.FullPath.Contains(queryParams.PathFilter, StringComparison.OrdinalIgnoreCase)) &&
+ (string.IsNullOrEmpty(queryParams.ExtensionFilter) || e.Extension.Equals(queryParams.ExtensionFilter, StringComparison.OrdinalIgnoreCase)))
+ .ToList();
+
+ // 应用排序
+ IEnumerable orderedEvents = queryParams.AscendingOrder
+ ? filteredEvents.OrderBy(e => e.Timestamp)
+ : filteredEvents.OrderByDescending(e => e.Timestamp);
+
+ // 计算总数
+ result.TotalCount = filteredEvents.Count;
+
+ // 应用分页
+ int skip = queryParams.PageIndex * queryParams.PageSize;
+ int take = queryParams.PageSize;
+
+ result.Events = orderedEvents.Skip(skip).Take(take).ToList();
+ result.HasMore = (skip + take) < result.TotalCount;
+
+ return result;
+ }
+ finally
+ {
+ _storageLock.Release();
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "查询事件时发生错误");
+ throw;
+ }
+ }
+
+ ///
+ /// 从文件加载事件
+ ///
+ /// 文件路径
+ /// 事件列表
+ private async Task> LoadEventsFromFileAsync(string filePath)
+ {
+ if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath))
+ {
+ return new List();
+ }
+
+ try
+ {
+ string jsonContent;
+
+ // 处理压缩文件
+ if (filePath.EndsWith(".gz"))
+ {
+ using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read);
+ using var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress);
+ using var reader = new StreamReader(gzipStream);
+
+ jsonContent = await reader.ReadToEndAsync();
+ }
+ else
+ {
+ jsonContent = await File.ReadAllTextAsync(filePath);
+ }
+
+ var logFile = JsonSerializer.Deserialize(jsonContent);
+ return logFile?.Events ?? new List();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"从文件加载事件失败:{filePath}");
+ return new List();
+ }
+ }
+
+ ///
+ /// 启动事件回放会话
+ ///
+ /// 查询参数,定义要回放的事件
+ /// 回放处理回调
+ /// 取消标记
+ /// 回放会话控制器
+ public async Task StartReplayAsync(
+ EventQueryParams queryParams,
+ Func replayHandler,
+ CancellationToken cancellationToken = default)
+ {
+ if (replayHandler == null) throw new ArgumentNullException(nameof(replayHandler));
+
+ // 查询要回放的事件
+ var queryResult = await QueryEventsAsync(queryParams);
+
+ // 创建并启动回放会话
+ var session = new EventReplaySession(
+ queryResult.Events,
+ replayHandler,
+ _config.EventStorage.ReplayIntervalMs,
+ _config.EventStorage.ReplaySpeedFactor,
+ _logger,
+ cancellationToken);
+
+ await session.StartAsync();
+ return session;
+ }
+
+ ///
+ /// 释放资源
+ ///
+ public void Dispose()
+ {
+ if (_disposed) return;
+
+ _disposed = true;
+ _storageTimer?.Dispose();
+ _storageLock?.Dispose();
+ _dbManager?.Dispose();
+
+ // 尝试保存剩余事件
+ if (_config.EventStorage.EnableEventStorage && !_eventQueue.IsEmpty)
+ {
+ var remainingEvents = new List();
+ while (!_eventQueue.IsEmpty && _eventQueue.TryDequeue(out var evt))
+ {
+ remainingEvents.Add(evt);
+ }
+
+ if (remainingEvents.Count > 0)
+ {
+ SaveEventsToFileAsync(remainingEvents).GetAwaiter().GetResult();
+ }
+ }
+
+ GC.SuppressFinalize(this);
+ }
+ }
+
+ ///
+ /// 事件回放会话
+ ///
+ public class EventReplaySession : IDisposable
+ {
+ private readonly List _events;
+ private readonly Func _replayHandler;
+ private readonly int _replayIntervalMs;
+ private readonly double _speedFactor;
+ private readonly ILogger _logger;
+ private readonly CancellationToken _cancellationToken;
+ private CancellationTokenSource _linkedCts;
+ private Task _replayTask;
+ private bool _disposed;
+ private bool _isPaused;
+ private readonly SemaphoreSlim _pauseSemaphore = new SemaphoreSlim(1, 1);
+
+ ///
+ /// 回放进度(0-100)
+ ///
+ public int Progress { get; private set; }
+
+ ///
+ /// 当前回放的事件索引
+ ///
+ public int CurrentIndex { get; private set; }
+
+ ///
+ /// 事件总数
+ ///
+ public int TotalEvents => _events?.Count ?? 0;
+
+ ///
+ /// 回放是否已完成
+ ///
+ public bool IsCompleted { get; private set; }
+
+ ///
+ /// 回放是否已暂停
+ ///
+ public bool IsPaused => _isPaused;
+
+ ///
+ /// 回放已处理的事件数
+ ///
+ public int ProcessedEvents { get; private set; }
+
+ ///
+ /// 回放开始时间
+ ///
+ public DateTime StartTime { get; private set; }
+
+ ///
+ /// 回放结束时间(如果已完成)
+ ///
+ public DateTime? EndTime { get; private set; }
+
+ ///
+ /// 创建新的事件回放会话
+ ///
+ /// 要回放的事件
+ /// 回放处理回调
+ /// 回放间隔(毫秒)
+ /// 速度因子
+ /// 日志记录器
+ /// 取消标记
+ public EventReplaySession(
+ List events,
+ Func replayHandler,
+ int replayIntervalMs,
+ double speedFactor,
+ ILogger logger,
+ CancellationToken cancellationToken = default)
+ {
+ _events = events ?? throw new ArgumentNullException(nameof(events));
+ _replayHandler = replayHandler ?? throw new ArgumentNullException(nameof(replayHandler));
+ _replayIntervalMs = Math.Max(1, replayIntervalMs);
+ _speedFactor = Math.Max(0.1, speedFactor);
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _cancellationToken = cancellationToken;
+
+ _linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ }
+
+ ///
+ /// 启动回放
+ ///
+ public async Task StartAsync()
+ {
+ if (_replayTask != null) return;
+
+ StartTime = DateTime.Now;
+ _replayTask = Task.Run(ReplayEventsAsync, _linkedCts.Token);
+ await Task.CompletedTask;
+ }
+
+ ///
+ /// 暂停回放
+ ///
+ public async Task PauseAsync()
+ {
+ if (_isPaused || IsCompleted) return;
+
+ await _pauseSemaphore.WaitAsync();
+ try
+ {
+ _isPaused = true;
+ }
+ finally
+ {
+ _pauseSemaphore.Release();
+ }
+
+ _logger.LogInformation("事件回放已暂停");
+ }
+
+ ///
+ /// 恢复回放
+ ///
+ public async Task ResumeAsync()
+ {
+ if (!_isPaused || IsCompleted) return;
+
+ await _pauseSemaphore.WaitAsync();
+ try
+ {
+ _isPaused = false;
+ // 释放信号量以允许回放任务继续
+ _pauseSemaphore.Release();
+ }
+ catch
+ {
+ _pauseSemaphore.Release();
+ throw;
+ }
+
+ _logger.LogInformation("事件回放已恢复");
+ }
+
+ ///
+ /// 停止回放
+ ///
+ public async Task StopAsync()
+ {
+ if (IsCompleted) return;
+
+ try
+ {
+ // 取消回放任务
+ _linkedCts?.Cancel();
+
+ // 如果暂停中,先恢复以允许取消
+ if (_isPaused)
+ {
+ await ResumeAsync();
+ }
+
+ // 等待任务完成
+ if (_replayTask != null)
+ {
+ await Task.WhenAny(_replayTask, Task.Delay(1000));
+ }
+
+ IsCompleted = true;
+ EndTime = DateTime.Now;
+
+ _logger.LogInformation("事件回放已手动停止");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "停止事件回放时发生错误");
+ }
+ }
+
+ ///
+ /// 回放事件处理
+ ///
+ private async Task ReplayEventsAsync()
+ {
+ try
+ {
+ _logger.LogInformation($"开始回放{_events.Count}个事件,速度因子:{_speedFactor}");
+
+ if (_events.Count == 0)
+ {
+ IsCompleted = true;
+ EndTime = DateTime.Now;
+ return;
+ }
+
+ DateTime? lastEventTime = null;
+
+ for (int i = 0; i < _events.Count; i++)
+ {
+ // 检查是否取消
+ if (_linkedCts.Token.IsCancellationRequested)
+ {
+ _logger.LogInformation("事件回放已取消");
+ break;
+ }
+
+ // 检查暂停状态
+ if (_isPaused)
+ {
+ // 等待恢复信号
+ await _pauseSemaphore.WaitAsync(_linkedCts.Token);
+ _pauseSemaphore.Release();
+ }
+
+ var currentEvent = _events[i];
+ CurrentIndex = i;
+
+ // 计算等待时间(根据事件之间的实际时间差和速度因子)
+ if (lastEventTime.HasValue && i > 0)
+ {
+ var actualTimeDiff = currentEvent.Timestamp - lastEventTime.Value;
+ var waitTimeMs = (int)(actualTimeDiff.TotalMilliseconds / _speedFactor);
+
+ // 应用最小等待时间
+ waitTimeMs = Math.Max(_replayIntervalMs, waitTimeMs);
+
+ // 等待指定时间
+ await Task.Delay(waitTimeMs, _linkedCts.Token);
+ }
+
+ // 处理当前事件
+ try
+ {
+ await _replayHandler(currentEvent);
+ ProcessedEvents++;
+
+ // 更新进度
+ Progress = (int)((i + 1) * 100.0 / _events.Count);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"处理回放事件时发生错误:{currentEvent.EventType} - {currentEvent.FullPath}");
+ }
+
+ lastEventTime = currentEvent.Timestamp;
+ }
+
+ // 完成回放
+ IsCompleted = true;
+ Progress = 100;
+ EndTime = DateTime.Now;
+
+ _logger.LogInformation($"事件回放已完成,共处理{ProcessedEvents}个事件,耗时:{(EndTime.Value - StartTime).TotalSeconds:F2}秒");
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogInformation("事件回放已取消");
+ IsCompleted = true;
+ EndTime = DateTime.Now;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "事件回放过程中发生错误");
+ IsCompleted = true;
+ EndTime = DateTime.Now;
+ }
+ }
+
+ ///
+ /// 释放资源
+ ///
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+
+ _linkedCts?.Cancel();
+ _linkedCts?.Dispose();
+ _pauseSemaphore?.Dispose();
+
+ GC.SuppressFinalize(this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/FileEvent.cs b/external/JiShe.CollectBus.PluginFileWatcher/FileEvent.cs
new file mode 100644
index 0000000..4e8d905
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/FileEvent.cs
@@ -0,0 +1,254 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text.Json.Serialization;
+
+namespace JiShe.CollectBus.PluginFileWatcher
+{
+ ///
+ /// 表示一个文件系统事件的数据模型,用于序列化和存储
+ ///
+ public class FileEvent
+ {
+ ///
+ /// 事件唯一标识
+ ///
+ public Guid Id { get; set; } = Guid.NewGuid();
+
+ ///
+ /// 事件发生时间
+ ///
+ public DateTime Timestamp { get; set; } = DateTime.UtcNow;
+
+ ///
+ /// 事件类型
+ ///
+ public FileEventType EventType { get; set; }
+
+ ///
+ /// 文件完整路径
+ ///
+ public string FullPath { get; set; }
+
+ ///
+ /// 文件名
+ ///
+ public string FileName { get; set; }
+
+ ///
+ /// 文件所在目录
+ ///
+ public string Directory { get; set; }
+
+ ///
+ /// 文件扩展名
+ ///
+ public string Extension { get; set; }
+
+ ///
+ /// 重命名前的旧文件名(仅在重命名事件中有效)
+ ///
+ public string OldFileName { get; set; }
+
+ ///
+ /// 重命名前的旧路径(仅在重命名事件中有效)
+ ///
+ public string OldFullPath { get; set; }
+
+ ///
+ /// 文件大小(字节),如果可获取
+ ///
+ public long? FileSize { get; set; }
+
+ ///
+ /// 自定义属性,可用于存储其他元数据
+ ///
+ public Dictionary Metadata { get; set; } = new Dictionary();
+
+ ///
+ /// 从FileSystemEventArgs创建FileEvent
+ ///
+ /// FileSystemEventArgs参数
+ /// FileEvent对象
+ public static FileEvent FromFileSystemEventArgs(FileSystemEventArgs e)
+ {
+ var fileEvent = new FileEvent
+ {
+ EventType = GetEventType(e.ChangeType),
+ FullPath = e.FullPath,
+ FileName = e.Name ?? Path.GetFileName(e.FullPath),
+ Directory = Path.GetDirectoryName(e.FullPath),
+ Extension = Path.GetExtension(e.FullPath)
+ };
+
+ // 如果是重命名事件,添加旧文件名信息
+ if (e is RenamedEventArgs renamedEvent)
+ {
+ fileEvent.OldFileName = Path.GetFileName(renamedEvent.OldFullPath);
+ fileEvent.OldFullPath = renamedEvent.OldFullPath;
+ }
+
+ // 尝试获取文件大小(如果文件存在且可访问)
+ try
+ {
+ if (File.Exists(e.FullPath) && e.ChangeType != WatcherChangeTypes.Deleted)
+ {
+ var fileInfo = new FileInfo(e.FullPath);
+ fileEvent.FileSize = fileInfo.Length;
+
+ // 添加一些额外的元数据
+ fileEvent.Metadata["CreationTime"] = fileInfo.CreationTime.ToString("o");
+ fileEvent.Metadata["LastWriteTime"] = fileInfo.LastWriteTime.ToString("o");
+ fileEvent.Metadata["IsReadOnly"] = fileInfo.IsReadOnly.ToString();
+ }
+ }
+ catch
+ {
+ // 忽略任何获取文件信息时的错误
+ }
+
+ return fileEvent;
+ }
+
+ ///
+ /// 将WatcherChangeTypes转换为FileEventType
+ ///
+ /// WatcherChangeTypes枚举值
+ /// 对应的FileEventType
+ public static FileEventType GetEventType(WatcherChangeTypes changeType)
+ {
+ return changeType switch
+ {
+ WatcherChangeTypes.Created => FileEventType.Created,
+ WatcherChangeTypes.Deleted => FileEventType.Deleted,
+ WatcherChangeTypes.Changed => FileEventType.Modified,
+ WatcherChangeTypes.Renamed => FileEventType.Renamed,
+ _ => FileEventType.Other
+ };
+ }
+ }
+
+ ///
+ /// 文件事件类型
+ ///
+ public enum FileEventType
+ {
+ ///
+ /// 文件被创建
+ ///
+ Created,
+
+ ///
+ /// 文件被修改
+ ///
+ Modified,
+
+ ///
+ /// 文件被删除
+ ///
+ Deleted,
+
+ ///
+ /// 文件被重命名
+ ///
+ Renamed,
+
+ ///
+ /// 其他类型事件
+ ///
+ Other
+ }
+
+ ///
+ /// 表示一个事件日志文件
+ ///
+ public class EventLogFile
+ {
+ ///
+ /// 日志文件创建时间
+ ///
+ public DateTime CreatedTime { get; set; } = DateTime.UtcNow;
+
+ ///
+ /// 日志文件包含的事件列表
+ ///
+ public List Events { get; set; } = new List();
+ }
+
+ ///
+ /// 事件查询结果
+ ///
+ public class EventQueryResult
+ {
+ ///
+ /// 查询到的事件列表
+ ///
+ public List Events { get; set; } = new List();
+
+ ///
+ /// 匹配的事件总数
+ ///
+ public int TotalCount { get; set; }
+
+ ///
+ /// 查询是否有更多结果
+ ///
+ public bool HasMore { get; set; }
+
+ ///
+ /// 查询时间范围的开始时间
+ ///
+ public DateTime StartTime { get; set; }
+
+ ///
+ /// 查询时间范围的结束时间
+ ///
+ public DateTime EndTime { get; set; }
+ }
+
+ ///
+ /// 事件查询参数
+ ///
+ public class EventQueryParams
+ {
+ ///
+ /// 查询开始时间
+ ///
+ public DateTime? StartTime { get; set; }
+
+ ///
+ /// 查询结束时间
+ ///
+ public DateTime? EndTime { get; set; }
+
+ ///
+ /// 事件类型过滤
+ ///
+ public FileEventType? EventType { get; set; }
+
+ ///
+ /// 文件路径过滤(支持包含关系)
+ ///
+ public string PathFilter { get; set; }
+
+ ///
+ /// 文件扩展名过滤
+ ///
+ public string ExtensionFilter { get; set; }
+
+ ///
+ /// 分页大小
+ ///
+ public int PageSize { get; set; } = 100;
+
+ ///
+ /// 分页索引(从0开始)
+ ///
+ public int PageIndex { get; set; } = 0;
+
+ ///
+ /// 排序方向,true为升序,false为降序
+ ///
+ public bool AscendingOrder { get; set; } = false;
+ }
+}
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/FileWatcherUtils.cs b/external/JiShe.CollectBus.PluginFileWatcher/FileWatcherUtils.cs
new file mode 100644
index 0000000..aeb7f90
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/FileWatcherUtils.cs
@@ -0,0 +1,157 @@
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.PluginFileWatcher
+{
+ ///
+ /// 文件监控相关工具类
+ ///
+ public static class FileWatcherUtils
+ {
+ ///
+ /// 检测文件是否被锁定
+ ///
+ /// 要检查的文件路径
+ /// 如果文件被锁定则返回true,否则返回false
+ public static bool IsFileLocked(string filePath)
+ {
+ if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath))
+ return false;
+
+ try
+ {
+ using (FileStream stream = File.Open(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None))
+ {
+ // 文件可以被完全访问,没有被锁定
+ stream.Close();
+ }
+ return false;
+ }
+ catch (IOException)
+ {
+ // 文件被锁定或正在被其他进程使用
+ return true;
+ }
+ catch (Exception)
+ {
+ // 其他错误(权限不足、路径无效等)
+ return true;
+ }
+ }
+
+ ///
+ /// 尝试处理一个可能被锁定的文件
+ ///
+ /// 文件路径
+ /// 成功解锁后要执行的操作
+ /// 健壮性配置
+ /// 处理结果:true表示成功处理,false表示处理失败
+ public static async Task TryHandleLockedFileAsync(string filePath, Func action, RobustnessConfig config)
+ {
+ if (!config.EnableFileLockDetection)
+ {
+ // 如果禁用了锁检测,直接执行操作
+ try
+ {
+ await action();
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ // 如果文件不存在或不是锁定状态,直接执行操作
+ if (!File.Exists(filePath) || !IsFileLocked(filePath))
+ {
+ try
+ {
+ await action();
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ // 文件被锁定,根据策略处理
+ switch (config.LockedFileStrategy.ToLower())
+ {
+ case "skip":
+ // 跳过这个文件
+ Console.WriteLine($"文件被锁定,已跳过: {filePath}");
+ return false;
+
+ case "retry":
+ // 重试几次
+ for (int i = 0; i < config.FileLockRetryCount; i++)
+ {
+ await Task.Delay(config.FileLockRetryDelayMs);
+
+ if (!IsFileLocked(filePath))
+ {
+ try
+ {
+ await action();
+ Console.WriteLine($"文件锁已释放,成功处理: {filePath}");
+ return true;
+ }
+ catch
+ {
+ // 继续重试
+ }
+ }
+ }
+ Console.WriteLine($"文件仍然被锁定,重试{config.FileLockRetryCount}次后放弃: {filePath}");
+ return false;
+
+ case "log":
+ default:
+ // 只记录不处理
+ Console.WriteLine($"文件被锁定,已记录: {filePath}");
+ return false;
+ }
+ }
+
+ ///
+ /// 检查文件系统监控器是否健康
+ ///
+ /// 要检查的监控器
+ /// 最后一次事件的时间
+ /// 健壮性配置
+ /// 如果监控器健康则返回true,否则返回false
+ public static bool IsWatcherHealthy(FileSystemWatcher watcher, DateTime lastEventTime, RobustnessConfig config)
+ {
+ if (watcher == null || !watcher.EnableRaisingEvents)
+ return false;
+
+ // 如果配置了超时时间,检查是否超时
+ if (config.WatcherTimeoutSeconds > 0)
+ {
+ // 如果最后事件时间超过了超时时间,认为监控器可能已经失效
+ TimeSpan timeSinceLastEvent = DateTime.UtcNow - lastEventTime;
+ if (timeSinceLastEvent.TotalSeconds > config.WatcherTimeoutSeconds)
+ {
+ // 执行一个简单的测试:尝试改变一些属性看是否抛出异常
+ try
+ {
+ var currentFilter = watcher.Filter;
+ watcher.Filter = currentFilter;
+ return true; // 如果没有异常,认为监控器仍然正常
+ }
+ catch
+ {
+ return false; // 抛出异常,认为监控器已经失效
+ }
+ }
+ }
+
+ // 默认情况下认为监控器健康
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj b/external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj
new file mode 100644
index 0000000..193c217
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj
@@ -0,0 +1,39 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+ Linux
+ ..\..
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
+
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Program.cs b/external/JiShe.CollectBus.PluginFileWatcher/Program.cs
new file mode 100644
index 0000000..92c47b8
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/Program.cs
@@ -0,0 +1,1110 @@
+using System;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using System.Linq;
+using Microsoft.Extensions.Configuration;
+using System.Collections.Generic;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using Serilog;
+using Serilog.Events;
+using Serilog.Extensions.Logging;
+using ILogger = Microsoft.Extensions.Logging.ILogger;
+
+namespace JiShe.CollectBus.PluginFileWatcher
+{
+ // 删除配置类的定义
+
+ class Program
+ {
+ // 配置对象,从appsettings.json加载
+ private static FileMonitorConfig _config = new FileMonitorConfig();
+
+ // 全局计数器和动态配置参数
+ private static long _totalEventsProcessed = 0;
+
+ // 监控器健康状态追踪
+ private static DateTime _lastEventTime = DateTime.UtcNow;
+ private static int _watcherRestartAttempts = 0;
+ private static FileSystemWatcher _activeWatcher = null;
+
+ // 事件存储和回放
+ private static EventStorage _eventStorage = null;
+ private static EventReplaySession _currentReplaySession = null;
+ private static ILogger _logger = NullLogger.Instance;
+
+ static async Task Main(string[] args)
+ {
+ Console.OutputEncoding = System.Text.Encoding.UTF8;
+
+ // 检查是否以数据库工具模式运行
+ if (args.Length > 0 && args[0].Equals("db", StringComparison.OrdinalIgnoreCase))
+ {
+ // 运行数据库工具
+ await RunDatabaseUtilityAsync(args.Skip(1).ToArray());
+ return;
+ }
+
+ // 加载配置文件
+ LoadConfiguration();
+
+ // 配置Serilog
+ ConfigureSerilog();
+
+ // 初始化日志记录器
+ _logger = new SerilogLoggerFactory().CreateLogger("FileMonitor");
+
+ Log.Information("高性能文件监控程序启动中...");
+
+ // 初始化事件存储
+ if (_config.EventStorage.EnableEventStorage)
+ {
+ _eventStorage = new EventStorage(_config, _logger);
+ Log.Information("事件存储已启用,存储目录:{Directory}", _config.EventStorage.StorageDirectory);
+ }
+
+ // 打印配置信息
+ LogConfiguration();
+
+ // 获取监控路径:优先命令行参数,其次配置文件,最后使用当前目录
+ string pathToMonitor = args.Length > 0 ? args[0] :
+ !string.IsNullOrEmpty(_config.General.DefaultMonitorPath) ?
+ _config.General.DefaultMonitorPath : Directory.GetCurrentDirectory();
+
+ // 命令行参数可以覆盖配置文件
+ if (args.Length > 1 && args[1].Equals("--no-filter", StringComparison.OrdinalIgnoreCase))
+ {
+ _config.General.EnableFileFiltering = false;
+ Log.Information("通过命令行参数禁用文件类型过滤,将监控所有文件");
+ }
+ else if (_config.General.EnableFileFiltering)
+ {
+ Log.Information("已启用文件类型过滤,仅监控以下类型: {Extensions}", string.Join(", ", _config.FileFilters.AllowedExtensions));
+ }
+
+ if (!Directory.Exists(pathToMonitor))
+ {
+ Log.Warning("错误:监控目录 '{Path}' 不存在。", pathToMonitor);
+ Log.Information("尝试创建该目录...");
+
+ try
+ {
+ Directory.CreateDirectory(pathToMonitor);
+ Log.Information("已成功创建目录: {Path}", pathToMonitor);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "创建目录失败: {Message}", ex.Message);
+ Log.Information("将使用当前目录作为监控路径。");
+ pathToMonitor = Directory.GetCurrentDirectory();
+ }
+ }
+
+ Log.Information("开始监控目录: {Path}", pathToMonitor);
+ Log.Information("按 'Q' 退出程序,按 'R' 重新加载配置,按 'H' 检查监控器健康状态。");
+ if (_config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay)
+ {
+ Log.Information("按 'P' 开始/暂停回放,按 'S' 停止回放,按 'B' 查询事件。");
+ }
+
+ // 创建一个容量由配置文件指定的有界通道
+ var channel = Channel.CreateBounded(new BoundedChannelOptions(_config.Performance.ChannelCapacity)
+ {
+ FullMode = BoundedChannelFullMode.DropOldest // 当通道满时丢弃旧事件
+ });
+
+ using var cts = new CancellationTokenSource();
+
+ // 启动内存监控
+ var memoryMonitorTask = StartMemoryMonitorAsync(cts.Token);
+
+ // 启动文件监控器
+ var monitorTask = StartFileMonitorAsync(pathToMonitor, channel.Writer, cts.Token);
+
+ // 启动健康监控任务
+ var healthMonitorTask = _config.Robustness.EnableAutoRecovery ?
+ StartWatcherHealthMonitorAsync(pathToMonitor, channel.Writer, cts.Token) : Task.CompletedTask;
+
+ // 启动事件处理器
+ var processorTask = ProcessEventsAsync(channel.Reader, cts.Token);
+
+ // 等待用户按下键退出或重新加载配置
+ bool needRestart = false;
+ while (true)
+ {
+ var key = Console.ReadKey(true).Key;
+ if (key == ConsoleKey.Q)
+ {
+ break; // 退出循环
+ }
+ else if (key == ConsoleKey.R)
+ {
+ Log.Information("正在重新加载配置...");
+ LoadConfiguration();
+ // 重新配置Serilog
+ ConfigureSerilog();
+ LogConfiguration();
+ Log.Information("配置已重新加载,部分配置需要重启程序才能生效。");
+ }
+ else if (key == ConsoleKey.H)
+ {
+ // 手动检查监控器健康状态
+ bool isHealthy = _activeWatcher != null &&
+ FileWatcherUtils.IsWatcherHealthy(_activeWatcher, _lastEventTime, _config.Robustness);
+ Log.Information("监控器健康状态: {Status}", (isHealthy ? "正常" : "异常"));
+ Log.Information("上次事件时间: {Time}", _lastEventTime);
+ Log.Information("重启次数: {Count}/{MaxCount}", _watcherRestartAttempts, _config.Robustness.MaxRestartAttempts);
+ Log.Information("已处理事件总数: {Count}", _totalEventsProcessed);
+ }
+ else if (key == ConsoleKey.P && _config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay)
+ {
+ await HandleEventReplayToggleAsync();
+ }
+ else if (key == ConsoleKey.S && _config.EventStorage.EnableEventStorage && _config.EventStorage.EnableEventReplay)
+ {
+ await StopReplayAsync();
+ }
+ else if (key == ConsoleKey.B && _config.EventStorage.EnableEventStorage)
+ {
+ await QueryEventsAsync();
+ }
+
+ await Task.Delay(100);
+ }
+
+ // 取消所有任务
+ cts.Cancel();
+
+ // 停止回放(如果正在进行)
+ await StopReplayAsync();
+
+ // 释放事件存储
+ _eventStorage?.Dispose();
+
+ try
+ {
+ var tasks = new List { monitorTask, processorTask, memoryMonitorTask };
+ if (_config.Robustness.EnableAutoRecovery)
+ {
+ tasks.Add(healthMonitorTask);
+ }
+
+ await Task.WhenAll(tasks);
+ }
+ catch (OperationCanceledException)
+ {
+ Log.Information("程序已正常退出。");
+ }
+
+ // 关闭Serilog
+ Log.CloseAndFlush();
+ }
+
+ // 配置Serilog
+ private static void ConfigureSerilog()
+ {
+ var configuration = new ConfigurationBuilder()
+ .SetBasePath(Directory.GetCurrentDirectory())
+ .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
+ .Build();
+
+ var loggerConfig = new LoggerConfiguration()
+ .ReadFrom.Configuration(configuration);
+
+ // 根据配置设置最小日志级别
+ LogEventLevel minimumLevel = LogEventLevel.Information;
+ if (!string.IsNullOrEmpty(_config.Logging?.LogLevel))
+ {
+ switch (_config.Logging.LogLevel.ToLower())
+ {
+ case "verbose":
+ minimumLevel = LogEventLevel.Verbose;
+ break;
+ case "debug":
+ minimumLevel = LogEventLevel.Debug;
+ break;
+ case "information":
+ minimumLevel = LogEventLevel.Information;
+ break;
+ case "warning":
+ minimumLevel = LogEventLevel.Warning;
+ break;
+ case "error":
+ minimumLevel = LogEventLevel.Error;
+ break;
+ case "fatal":
+ minimumLevel = LogEventLevel.Fatal;
+ break;
+ }
+ }
+
+ // 如果配置文件中没有Serilog节,使用默认配置
+ var hasConfig = configuration.GetSection("Serilog").Exists();
+ if (!hasConfig)
+ {
+ loggerConfig
+ .MinimumLevel.Is(minimumLevel)
+ .WriteTo.Console(
+ outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}"
+ )
+ .WriteTo.File(
+ Path.Combine(_config.Logging?.LogDirectory ?? "Logs", "filemonitor-.log"),
+ rollingInterval: RollingInterval.Day,
+ outputTemplate: "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}",
+ retainedFileCountLimit: _config.Logging?.RetainedLogDays ?? 30
+ );
+ }
+
+ Log.Logger = loggerConfig.CreateLogger();
+ }
+
+ // 查询事件
+ private static async Task QueryEventsAsync()
+ {
+ if (_eventStorage == null)
+ {
+ Log.Warning("事件存储未启用");
+ return;
+ }
+
+ try
+ {
+ Log.Information("\n================ 事件查询 ================");
+ Console.WriteLine("请输入查询参数,直接回车使用默认值");
+
+ // 收集查询参数
+ var queryParams = new EventQueryParams();
+
+ Console.Write("开始时间 (yyyy-MM-dd HH:mm:ss),默认24小时前: ");
+ string startTimeStr = Console.ReadLine();
+ if (!string.IsNullOrEmpty(startTimeStr))
+ {
+ if (DateTime.TryParse(startTimeStr, out var startTime))
+ {
+ queryParams.StartTime = startTime;
+ }
+ else
+ {
+ Log.Warning("无效的日期格式,使用默认值");
+ queryParams.StartTime = DateTime.Now.AddDays(-1);
+ }
+ }
+ else
+ {
+ queryParams.StartTime = DateTime.Now.AddDays(-1);
+ }
+
+ Console.Write("结束时间 (yyyy-MM-dd HH:mm:ss),默认当前时间: ");
+ string endTimeStr = Console.ReadLine();
+ if (!string.IsNullOrEmpty(endTimeStr))
+ {
+ if (DateTime.TryParse(endTimeStr, out var endTime))
+ {
+ queryParams.EndTime = endTime;
+ }
+ else
+ {
+ Log.Warning("无效的日期格式,使用默认值");
+ }
+ }
+
+ Console.Write("事件类型 (Created, Modified, Deleted, Renamed, All),默认All: ");
+ string eventTypeStr = Console.ReadLine();
+ if (!string.IsNullOrEmpty(eventTypeStr) && eventTypeStr.ToLower() != "all")
+ {
+ if (Enum.TryParse(eventTypeStr, true, out var eventType))
+ {
+ queryParams.EventType = eventType;
+ }
+ else
+ {
+ Log.Warning("无效的事件类型,使用默认值All");
+ }
+ }
+
+ Console.Write("路径过滤 (包含此文本的路径),默认无: ");
+ string pathFilter = Console.ReadLine();
+ if (!string.IsNullOrEmpty(pathFilter))
+ {
+ queryParams.PathFilter = pathFilter;
+ }
+
+ Console.Write("文件扩展名过滤 (.txt, .exe等),默认无: ");
+ string extensionFilter = Console.ReadLine();
+ if (!string.IsNullOrEmpty(extensionFilter))
+ {
+ queryParams.ExtensionFilter = extensionFilter;
+ }
+
+ Console.Write("每页显示数量,默认20: ");
+ string pageSizeStr = Console.ReadLine();
+ if (!string.IsNullOrEmpty(pageSizeStr) && int.TryParse(pageSizeStr, out var pageSize))
+ {
+ queryParams.PageSize = pageSize;
+ }
+ else
+ {
+ queryParams.PageSize = 20;
+ }
+
+ // 执行查询
+ var result = await _eventStorage.QueryEventsAsync(queryParams);
+
+ Log.Information("查询结果 (共{Count}条记录):", result.TotalCount);
+ Log.Information("时间范围: {StartTime} 至 {EndTime}", result.StartTime, result.EndTime);
+ Console.WriteLine("---------------------------------------------");
+
+ if (result.Events.Count == 0)
+ {
+ Log.Warning("未找到符合条件的事件");
+ }
+ else
+ {
+ foreach (var evt in result.Events)
+ {
+ string eventType = evt.EventType.ToString();
+ string timeStr = evt.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff");
+
+ Console.WriteLine($"[{timeStr}] [{eventType}] {evt.FileName}");
+ Console.WriteLine($" 路径: {evt.Directory}");
+
+ if (evt.EventType == FileEventType.Renamed && !string.IsNullOrEmpty(evt.OldFileName))
+ {
+ Console.WriteLine($" 原名称: {evt.OldFileName}");
+ }
+
+ Console.WriteLine();
+ }
+
+ if (result.HasMore)
+ {
+ Log.Warning("... 还有更多记录未显示 ...");
+ }
+ }
+
+ // 询问是否开始回放
+ if (result.Events.Count > 0 && _config.EventStorage.EnableEventReplay)
+ {
+ Console.Write("\n是否要回放这些事件? (Y/N): ");
+ string replayAnswer = Console.ReadLine();
+ if (!string.IsNullOrEmpty(replayAnswer) && replayAnswer.ToUpper().StartsWith("Y"))
+ {
+ await StartReplayAsync(queryParams);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "查询事件时出错: {Message}", ex.Message);
+ }
+
+ Console.WriteLine("=============================================\n");
+ }
+
+ // 处理回放切换(开始/暂停)
+ private static async Task HandleEventReplayToggleAsync()
+ {
+ if (_eventStorage == null) return;
+
+ try
+ {
+ if (_currentReplaySession == null)
+ {
+ // 如果没有活动的回放会话,创建一个简单的查询参数并开始回放
+ var queryParams = new EventQueryParams
+ {
+ StartTime = DateTime.Now.AddHours(-1),
+ PageSize = 1000
+ };
+
+ await StartReplayAsync(queryParams);
+ }
+ else if (_currentReplaySession.IsPaused)
+ {
+ // 恢复回放
+ await _currentReplaySession.ResumeAsync();
+ Log.Information("已恢复事件回放");
+ }
+ else
+ {
+ // 暂停回放
+ await _currentReplaySession.PauseAsync();
+ Log.Information("已暂停事件回放");
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "处理回放时出错: {Message}", ex.Message);
+ }
+ }
+
+ // 开始回放
+ private static async Task StartReplayAsync(EventQueryParams queryParams)
+ {
+ if (_eventStorage == null || !_config.EventStorage.EnableEventReplay) return;
+
+ try
+ {
+ // 停止现有回放
+ await StopReplayAsync();
+
+ // 创建回放处理器
+ Func replayHandler = async (e) =>
+ {
+ // 在控制台上显示回放的事件
+ string eventType = e.EventType.ToString();
+ string timeStr = e.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff");
+
+ Log.Information("[回放] [{Time}] [{Type}] {FileName}", timeStr, eventType, e.FileName);
+
+ if (_config.Logging.LogFileEventDetails)
+ {
+ Log.Debug(" 路径: {Path}", e.Directory);
+
+ if (e.EventType == FileEventType.Renamed && !string.IsNullOrEmpty(e.OldFileName))
+ {
+ Log.Debug(" 原名称: {OldName}", e.OldFileName);
+ }
+ }
+
+ // 这里可以添加实际的文件操作,比如创建、删除文件等
+ // 但在此示例中,我们只是显示事件而不执行实际操作
+
+ await Task.CompletedTask;
+ };
+
+ // 开始回放会话
+ _currentReplaySession = await _eventStorage.StartReplayAsync(
+ queryParams,
+ replayHandler,
+ CancellationToken.None);
+
+ // 如果会话创建成功,开始回放
+ if (_currentReplaySession != null)
+ {
+ await _currentReplaySession.StartAsync();
+ Log.Information("开始回放事件,共 {Count} 个事件", _currentReplaySession.TotalEvents);
+ Log.Information("回放速度: {Speed}", (_config.EventStorage.ReplaySpeedFactor > 1 ?
+ $"{_config.EventStorage.ReplaySpeedFactor}x (加速)" :
+ $"{_config.EventStorage.ReplaySpeedFactor}x (减速)"));
+ }
+ else
+ {
+ Log.Warning("未找到符合条件的事件,回放未开始");
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "开始回放时出错: {Message}", ex.Message);
+ }
+ }
+
+ // 停止回放
+ private static async Task StopReplayAsync()
+ {
+ if (_currentReplaySession != null)
+ {
+ try
+ {
+ await _currentReplaySession.StopAsync();
+ Log.Information("已停止事件回放");
+
+ // 输出回放统计
+ Log.Information("回放统计: 共处理 {Processed}/{Total} 个事件",
+ _currentReplaySession.ProcessedEvents,
+ _currentReplaySession.TotalEvents);
+
+ TimeSpan duration = (_currentReplaySession.EndTime ?? DateTime.Now) - _currentReplaySession.StartTime;
+ Log.Information("回放持续时间: {Duration:F1} 秒", duration.TotalSeconds);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "停止回放时出错: {Message}", ex.Message);
+ }
+ finally
+ {
+ _currentReplaySession.Dispose();
+ _currentReplaySession = null;
+ }
+ }
+ }
+
+ // 加载配置文件
+ private static void LoadConfiguration()
+ {
+ try
+ {
+ // 检查配置文件是否存在
+ string configPath = Path.Combine(Directory.GetCurrentDirectory(), "appsettings.json");
+ if (!File.Exists(configPath))
+ {
+ Log.Warning("配置文件不存在,使用默认配置。");
+
+ // 使用默认配置
+ _config = new FileMonitorConfig
+ {
+ General = new GeneralConfig
+ {
+ EnableFileFiltering = true,
+ MemoryMonitorIntervalMinutes = 1,
+ DefaultMonitorPath = Path.Combine(Directory.GetCurrentDirectory(), "MonitorFiles")
+ },
+ FileFilters = new FileFiltersConfig
+ {
+ AllowedExtensions = new[] { ".dll" },
+ ExcludedDirectories = new[] { "bin", "obj", "node_modules" },
+ IncludeSubdirectories = true
+ },
+ Performance = new PerformanceConfig
+ {
+ MemoryCleanupThreshold = 5000,
+ ChannelCapacity = 1000,
+ EventDebounceTimeSeconds = 3,
+ MaxDictionarySize = 10000,
+ CleanupIntervalSeconds = 5,
+ ProcessingDelayMs = 5
+ },
+ Robustness = new RobustnessConfig
+ {
+ EnableAutoRecovery = true,
+ WatcherHealthCheckIntervalSeconds = 30,
+ WatcherTimeoutSeconds = 60,
+ MaxRestartAttempts = 3,
+ RestartDelaySeconds = 5,
+ EnableFileLockDetection = true,
+ LockedFileStrategy = "Retry",
+ FileLockRetryCount = 3,
+ FileLockRetryDelayMs = 500
+ },
+ NotifyFilters = new List { "LastWrite", "FileName", "DirectoryName", "CreationTime" },
+ EventStorage = new EventStorageConfig
+ {
+ EnableEventStorage = false,
+ StorageDirectory = Path.Combine(Directory.GetCurrentDirectory(), "EventStorage"),
+ EnableEventReplay = false,
+ ReplaySpeedFactor = 1
+ },
+ Logging = new LoggingConfig
+ {
+ LogLevel = "Information",
+ LogFileEventDetails = false,
+ RetainedLogDays = 30,
+ LogDirectory = "Logs"
+ }
+ };
+
+ return;
+ }
+
+ var builder = new ConfigurationBuilder()
+ .SetBasePath(Directory.GetCurrentDirectory())
+ .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
+
+ var configuration = builder.Build();
+
+ // 绑定配置到对象
+ var config = new FileMonitorConfig();
+ configuration.GetSection("FileMonitorConfig").Bind(config);
+
+ // 更新全局配置
+ _config = config;
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "加载配置文件失败: {Message}", ex.Message);
+ Log.Warning("将使用默认配置。");
+
+ // 使用默认配置
+ _config = new FileMonitorConfig
+ {
+ General = new GeneralConfig
+ {
+ EnableFileFiltering = true,
+ MemoryMonitorIntervalMinutes = 1,
+ DefaultMonitorPath = Path.Combine(Directory.GetCurrentDirectory(), "MonitorFiles")
+ },
+ FileFilters = new FileFiltersConfig
+ {
+ AllowedExtensions = new[] { ".dll" },
+ ExcludedDirectories = new[] { "bin", "obj", "node_modules" },
+ IncludeSubdirectories = true
+ },
+ Performance = new PerformanceConfig
+ {
+ MemoryCleanupThreshold = 5000,
+ ChannelCapacity = 1000,
+ EventDebounceTimeSeconds = 3,
+ MaxDictionarySize = 10000,
+ CleanupIntervalSeconds = 5,
+ ProcessingDelayMs = 5
+ },
+ Robustness = new RobustnessConfig
+ {
+ EnableAutoRecovery = true,
+ WatcherHealthCheckIntervalSeconds = 30,
+ WatcherTimeoutSeconds = 60,
+ MaxRestartAttempts = 3,
+ RestartDelaySeconds = 5,
+ EnableFileLockDetection = true,
+ LockedFileStrategy = "Retry",
+ FileLockRetryCount = 3,
+ FileLockRetryDelayMs = 500
+ },
+ NotifyFilters = new List { "LastWrite", "FileName", "DirectoryName", "CreationTime" },
+ EventStorage = new EventStorageConfig
+ {
+ EnableEventStorage = false,
+ StorageDirectory = Path.Combine(Directory.GetCurrentDirectory(), "EventStorage"),
+ EnableEventReplay = false,
+ ReplaySpeedFactor = 1
+ },
+ Logging = new LoggingConfig
+ {
+ LogLevel = "Information",
+ LogFileEventDetails = false,
+ RetainedLogDays = 30,
+ LogDirectory = "Logs"
+ }
+ };
+ }
+ }
+
+ // 打印当前配置信息
+ private static void LogConfiguration()
+ {
+ Log.Information("当前配置信息:");
+ Log.Information(" 默认监控路径: {Path}", _config.General.DefaultMonitorPath);
+ Log.Information(" 文件过滤: {Status}", (_config.General.EnableFileFiltering ? "启用" : "禁用"));
+ if (_config.General.EnableFileFiltering)
+ {
+ Log.Information(" 监控文件类型: {Types}", string.Join(", ", _config.FileFilters.AllowedExtensions));
+ Log.Information(" 排除目录: {Dirs}", string.Join(", ", _config.FileFilters.ExcludedDirectories));
+ }
+ Log.Information(" 内存监控间隔: {Minutes}分钟", _config.General.MemoryMonitorIntervalMinutes);
+ Log.Information(" 内存清理阈值: 每{Threshold}个事件", _config.Performance.MemoryCleanupThreshold);
+ Log.Information(" 通道容量: {Capacity}", _config.Performance.ChannelCapacity);
+ Log.Information(" 事件去抖时间: {Seconds}秒", _config.Performance.EventDebounceTimeSeconds);
+ Log.Information(" 自动恢复: {Status}", (_config.Robustness.EnableAutoRecovery ? "启用" : "禁用"));
+ Log.Information(" 文件锁检测: {Status}", (_config.Robustness.EnableFileLockDetection ? "启用" : "禁用"));
+ if (_config.Robustness.EnableFileLockDetection)
+ {
+ Log.Information(" 锁定文件策略: {Strategy}", _config.Robustness.LockedFileStrategy);
+ }
+ Log.Information(" 日志级别: {Level}", _config.Logging?.LogLevel ?? "Information");
+ }
+
+ // 检查文件是否应该被处理的方法
+ private static bool ShouldProcessFile(string filePath)
+ {
+ if (!_config.General.EnableFileFiltering)
+ return true;
+
+ if (string.IsNullOrEmpty(filePath))
+ return false;
+
+ // 检查是否在排除目录中
+ foreach (var excludedDir in _config.FileFilters.ExcludedDirectories)
+ {
+ if (filePath.Contains($"{Path.DirectorySeparatorChar}{excludedDir}{Path.DirectorySeparatorChar}", StringComparison.OrdinalIgnoreCase) ||
+ filePath.EndsWith($"{Path.DirectorySeparatorChar}{excludedDir}", StringComparison.OrdinalIgnoreCase))
+ {
+ return false;
+ }
+ }
+
+ string extension = Path.GetExtension(filePath);
+ return _config.FileFilters.AllowedExtensions.Contains(extension, StringComparer.OrdinalIgnoreCase);
+ }
+
+ // 添加内存监控方法
+ private static async Task StartMemoryMonitorAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ // 等待指定时间
+ await Task.Delay(TimeSpan.FromMinutes(_config.General.MemoryMonitorIntervalMinutes), cancellationToken);
+
+ // 获取当前内存使用情况
+ long currentMemory = GC.GetTotalMemory(false) / (1024 * 1024); // MB
+
+ Log.Information("内存使用: {Memory} MB, 已处理事件: {EventCount}", currentMemory, _totalEventsProcessed);
+
+ // 强制执行垃圾回收
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+
+ // 显示垃圾回收后的内存
+ long afterGCMemory = GC.GetTotalMemory(true) / (1024 * 1024); // MB
+ Log.Debug("内存清理后: {Memory} MB (释放: {Released} MB)", afterGCMemory, currentMemory - afterGCMemory);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // 预期的取消异常
+ }
+ }
+
+ // 创建文件监控器
+ private static FileSystemWatcher CreateFileWatcher(string path, ChannelWriter writer, CancellationToken cancellationToken)
+ {
+ // 创建一个新的文件系统监控器
+ var watcher = new FileSystemWatcher(path);
+
+ // 从配置设置NotifyFilters
+ NotifyFilters notifyFilters = NotifyFilters.LastWrite; // 默认值
+ foreach (var filterName in _config.NotifyFilters)
+ {
+ if (Enum.TryParse(filterName, out var filter))
+ {
+ notifyFilters |= filter;
+ }
+ }
+
+ watcher.NotifyFilter = notifyFilters;
+ watcher.IncludeSubdirectories = _config.FileFilters.IncludeSubdirectories;
+ watcher.EnableRaisingEvents = true;
+
+ // 设置文件过滤器,如果启用了过滤
+ if (_config.General.EnableFileFiltering && _config.FileFilters.AllowedExtensions.Length > 0)
+ {
+ // 在FileSystemWatcher级别设置过滤器,这样可以减少系统生成的事件数量
+ // 只能设置一个扩展名作为Filter,所以我们选择第一个
+ string firstExtension = _config.FileFilters.AllowedExtensions[0];
+ watcher.Filter = $"*{firstExtension}";
+ Log.Information("已设置文件系统监控过滤器: *{Filter}", firstExtension);
+
+ if (_config.FileFilters.AllowedExtensions.Length > 1)
+ {
+ Log.Warning("注意: FileSystemWatcher只支持单一过滤器,其他文件类型将在事件处理时过滤。");
+ }
+ }
+
+ // 注册事件处理程序
+ watcher.Created += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
+ watcher.Changed += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
+ watcher.Deleted += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
+ watcher.Renamed += (sender, e) => HandleFileEvent(sender, e, writer, cancellationToken);
+ watcher.Error += (sender, e) =>
+ {
+ Log.Error(e.GetException(), "文件监控错误: {Message}", e.GetException().Message);
+ };
+
+ return watcher;
+ }
+
+ // 文件事件处理
+ private static void HandleFileEvent(object sender, FileSystemEventArgs e, ChannelWriter writer, CancellationToken cancellationToken)
+ {
+ // 更新最后事件时间
+ _lastEventTime = DateTime.UtcNow;
+
+ // 如果启用了事件存储,记录事件
+ _eventStorage?.RecordEvent(e);
+
+ try
+ {
+ // 如果是退出信号,忽略后续处理
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return;
+ }
+
+ // 只处理指定类型的文件,如果启用了文件过滤
+ if (!ShouldProcessFile(e.FullPath))
+ {
+ return;
+ }
+
+ // 尝试将事件写入通道,如果满了就丢弃
+ // 不等待,以免阻塞文件系统事件
+ if (!writer.TryWrite(e))
+ {
+ Log.Warning("警告: 事件处理队列已满,部分事件被丢弃");
+ }
+ }
+ catch (Exception ex)
+ {
+ // 捕获任何异常,防止崩溃
+ Log.Error(ex, "处理文件事件时出错: {Message}", ex.Message);
+ }
+ }
+
+ private static async Task StartFileMonitorAsync(string path, ChannelWriter writer, CancellationToken cancellationToken)
+ {
+ // 使用TaskCompletionSource来控制任务的完成
+ var tcs = new TaskCompletionSource();
+
+ try
+ {
+ // 创建并启动监控器
+ _activeWatcher = CreateFileWatcher(path, writer, cancellationToken);
+
+ // 注册取消回调
+ cancellationToken.Register(() =>
+ {
+ try
+ {
+ if (_activeWatcher != null)
+ {
+ // 确保所有资源正确清理
+ _activeWatcher.EnableRaisingEvents = false;
+ _activeWatcher.Dispose();
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "关闭文件监控器时出错: {Message}", ex.Message);
+ }
+ finally
+ {
+ tcs.TrySetResult(true);
+ }
+ });
+
+ // 等待任务被取消或完成
+ await tcs.Task;
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "启动文件监控器时出错: {Message}", ex.Message);
+ tcs.TrySetException(ex);
+ throw;
+ }
+ }
+
+ private static async Task ProcessEventsAsync(ChannelReader reader, CancellationToken cancellationToken)
+ {
+ try
+ {
+ // 使用配置的字典大小
+ var recentlyProcessed = new ConcurrentDictionary();
+
+ // 清理计时器,频率由配置决定
+ using var cleanupTimer = new Timer(_ =>
+ {
+ try
+ {
+ var cutoff = DateTime.UtcNow.AddSeconds(-_config.Performance.EventDebounceTimeSeconds);
+ int removedCount = 0;
+
+ // 限制字典大小
+ if (recentlyProcessed.Count > _config.Performance.MaxDictionarySize)
+ {
+ // 找出最旧的条目删除
+ var oldestItems = recentlyProcessed
+ .OrderBy(kv => kv.Value)
+ .Take(recentlyProcessed.Count - _config.Performance.MaxDictionarySize / 2);
+
+ foreach (var item in oldestItems)
+ {
+ DateTime dummy;
+ if (recentlyProcessed.TryRemove(item.Key, out dummy))
+ {
+ removedCount++;
+ }
+ }
+ }
+
+ // 常规清理
+ foreach (var key in recentlyProcessed.Keys)
+ {
+ if (recentlyProcessed.TryGetValue(key, out var time) && time < cutoff)
+ {
+ DateTime dummy;
+ if (recentlyProcessed.TryRemove(key, out dummy))
+ {
+ removedCount++;
+ }
+ }
+ }
+
+ if (removedCount > 0)
+ {
+ Log.Debug("已清理 {Count} 个过期事件记录", removedCount);
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "清理定时器错误: {Message}", ex.Message);
+ }
+ }, null, TimeSpan.FromSeconds(_config.Performance.CleanupIntervalSeconds),
+ TimeSpan.FromSeconds(_config.Performance.CleanupIntervalSeconds));
+
+ // 从通道读取事件并处理
+ await foreach (var e in reader.ReadAllAsync(cancellationToken))
+ {
+ // 再次确认文件扩展名,双重检查
+ if (!ShouldProcessFile(e.FullPath))
+ {
+ continue;
+ }
+
+ // 增加计数器
+ long count = Interlocked.Increment(ref _totalEventsProcessed);
+
+ // 周期性清理内存
+ if (count % _config.Performance.MemoryCleanupThreshold == 0)
+ {
+ // 在后台线程执行垃圾回收,但直接等待完成避免警告
+ await Task.Run(() =>
+ {
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ });
+ }
+
+ // 只使用文件路径作为键,忽略事件类型,这样同一文件的创建和修改事件会被视为同一事件
+ var key = e.FullPath;
+
+ // 对于创建事件,如果之前有相同文件的其他事件,则跳过
+ if (e.ChangeType == WatcherChangeTypes.Changed &&
+ recentlyProcessed.TryGetValue(key, out var _))
+ {
+ // 如果是修改事件且文件最近被处理过,则跳过
+ continue;
+ }
+
+ // 更新或添加文件处理时间,使用原子操作
+ recentlyProcessed[key] = DateTime.UtcNow;
+
+ string eventType = e.ChangeType switch
+ {
+ WatcherChangeTypes.Created => "创建",
+ WatcherChangeTypes.Deleted => "删除",
+ WatcherChangeTypes.Changed => "修改",
+ WatcherChangeTypes.Renamed => "重命名",
+ _ => "未知"
+ };
+
+ string fileName = e.Name ?? Path.GetFileName(e.FullPath);
+ string directoryName = Path.GetDirectoryName(e.FullPath) ?? string.Empty;
+
+ Log.Information("{EventType}: {FileName}", eventType, fileName);
+
+ if (_config.Logging.LogFileEventDetails)
+ {
+ Log.Debug(" 路径: {Path}", directoryName);
+
+ var extension = Path.GetExtension(e.FullPath);
+ Log.Debug(" 类型: {Extension}", extension);
+
+ if (e is RenamedEventArgs renamedEvent)
+ {
+ Log.Debug(" 原名称: {OldName}", Path.GetFileName(renamedEvent.OldFullPath));
+ }
+ }
+
+ // 延迟可配置
+ await Task.Delay(_config.Performance.ProcessingDelayMs, cancellationToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // 预期的取消异常,正常退出
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "处理事件时发生错误: {Message}", ex.Message);
+ }
+ finally
+ {
+ // 确保资源释放
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ }
+ }
+
+ // 健康监控任务,监视文件监控器的状态并在需要时重启它
+ private static async Task StartWatcherHealthMonitorAsync(
+ string path, ChannelWriter writer, CancellationToken cancellationToken)
+ {
+ Log.Information("已启动监控器健康检查,间隔: {Seconds}秒", _config.Robustness.WatcherHealthCheckIntervalSeconds);
+
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ // 等待指定的健康检查间隔
+ await Task.Delay(TimeSpan.FromSeconds(_config.Robustness.WatcherHealthCheckIntervalSeconds),
+ cancellationToken);
+
+ // 如果监控器不健康且重启次数未超过上限,尝试重启
+ if (_activeWatcher != null &&
+ !FileWatcherUtils.IsWatcherHealthy(_activeWatcher, _lastEventTime, _config.Robustness) &&
+ _watcherRestartAttempts < _config.Robustness.MaxRestartAttempts)
+ {
+ // 停止当前的监控器
+ Log.Warning("检测到监控器异常,准备重启...");
+
+ try
+ {
+ _activeWatcher.EnableRaisingEvents = false;
+ _activeWatcher.Dispose();
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "停止异常监控器时出错: {Message}", ex.Message);
+ }
+
+ _watcherRestartAttempts++;
+ Log.Warning("正在重启监控器,尝试次数: {Current}/{Max}",
+ _watcherRestartAttempts, _config.Robustness.MaxRestartAttempts);
+
+ // 等待指定的重启延迟时间
+ await Task.Delay(TimeSpan.FromSeconds(_config.Robustness.RestartDelaySeconds),
+ cancellationToken);
+
+ // 创建新的监控器
+ try
+ {
+ _activeWatcher = CreateFileWatcher(path, writer, cancellationToken);
+ _lastEventTime = DateTime.UtcNow; // 重置最后事件时间
+ Log.Information("监控器已成功重启");
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "重启监控器失败: {Message}", ex.Message);
+ }
+ }
+ else if (_watcherRestartAttempts >= _config.Robustness.MaxRestartAttempts)
+ {
+ Log.Error("警告: 已达到最大重启次数({Max}),不再尝试重启", _config.Robustness.MaxRestartAttempts);
+ Log.Warning("请检查文件系统或手动重启程序");
+ break;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // 预期的取消异常,正常退出
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "健康监控任务异常: {Message}", ex.Message);
+ }
+ }
+
+ ///
+ /// 运行数据库工具
+ ///
+ private static async Task RunDatabaseUtilityAsync(string[] args)
+ {
+ Console.WriteLine("正在启动SQLite数据库管理工具...");
+
+ try
+ {
+ var dbUtility = new DbUtility();
+ await dbUtility.ExecuteAsync(args);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"运行数据库工具时发生错误: {ex.Message}");
+ }
+ }
+ }
+}
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/Properties/launchSettings.json b/external/JiShe.CollectBus.PluginFileWatcher/Properties/launchSettings.json
new file mode 100644
index 0000000..9293f4d
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/Properties/launchSettings.json
@@ -0,0 +1,10 @@
+{
+ "profiles": {
+ "JiShe.CollectBus.PluginFileWatcher": {
+ "commandName": "Project"
+ },
+ "Container (Dockerfile)": {
+ "commandName": "Docker"
+ }
+ }
+}
\ No newline at end of file
diff --git a/external/JiShe.CollectBus.PluginFileWatcher/appsettings.json b/external/JiShe.CollectBus.PluginFileWatcher/appsettings.json
new file mode 100644
index 0000000..f0eedfa
--- /dev/null
+++ b/external/JiShe.CollectBus.PluginFileWatcher/appsettings.json
@@ -0,0 +1,88 @@
+{
+ "Serilog": {
+ "Using": [ "Serilog.Sinks.Console", "Serilog.Sinks.File" ],
+ "MinimumLevel": {
+ "Default": "Information",
+ "Override": {
+ "Microsoft": "Warning",
+ "System": "Warning"
+ }
+ },
+ "WriteTo": [
+ {
+ "Name": "Console",
+ "Args": {
+ "outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}"
+ }
+ },
+ {
+ "Name": "File",
+ "Args": {
+ "path": "Logs/filemonitor-.log",
+ "rollingInterval": "Day",
+ "outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}",
+ "retainedFileCountLimit": 31
+ }
+ }
+ ],
+ "Enrich": [ "FromLogContext" ]
+ },
+ "FileMonitorConfig": {
+ "General": {
+ "EnableFileFiltering": true,
+ "MemoryMonitorIntervalMinutes": 1,
+ "DefaultMonitorPath": "D:\\MonitorFiles"
+ },
+ "FileFilters": {
+ "AllowedExtensions": [ ".dll" ],
+ "ExcludedDirectories": [ "bin", "obj", "node_modules" ],
+ "IncludeSubdirectories": true
+ },
+ "Performance": {
+ "MemoryCleanupThreshold": 5000,
+ "ChannelCapacity": 1000,
+ "EventDebounceTimeSeconds": 3,
+ "MaxDictionarySize": 10000,
+ "CleanupIntervalSeconds": 5,
+ "ProcessingDelayMs": 5
+ },
+ "Robustness": {
+ "EnableAutoRecovery": true,
+ "WatcherHealthCheckIntervalSeconds": 30,
+ "WatcherTimeoutSeconds": 60,
+ "MaxRestartAttempts": 3,
+ "RestartDelaySeconds": 5,
+ "EnableFileLockDetection": true,
+ "LockedFileStrategy": "Retry",
+ "FileLockRetryCount": 3,
+ "FileLockRetryDelayMs": 500
+ },
+ "EventStorage": {
+ "EnableEventStorage": true,
+ "StorageType": "SQLite",
+ "StorageDirectory": "D:/EventLogs",
+ "DatabasePath": "D:/EventLogs/events.db",
+ "ConnectionString": "Data Source=D:/EventLogs/events.db;Foreign Keys=True",
+ "CommandTimeout": 30,
+ "LogFileNameFormat": "FileEvents_{0:yyyy-MM-dd}.json",
+ "StorageIntervalSeconds": 60,
+ "BatchSize": 100,
+ "MaxEventRecords": 100000,
+ "DataRetentionDays": 30,
+ "MaxLogFiles": 30,
+ "CompressLogFiles": true,
+ "EnableEventReplay": true,
+ "ReplayIntervalMs": 100,
+ "ReplaySpeedFactor": 1.0
+ },
+ "NotifyFilters": [
+ "LastWrite",
+ "FileName",
+ "DirectoryName",
+ "CreationTime"
+ ],
+ "Logging": {
+ "LogLevel": "Information"
+ }
+ }
+}
diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
index 5313833..51c3304 100644
--- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
@@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Text;
+using Volo.Abp.Timing;
namespace JiShe.CollectBus.Kafka.Consumer
{
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
index e49d1be..da59a22 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs
@@ -1,4 +1,5 @@
-using JiShe.CollectBus.Common.Enums;
+using FreeRedis;
+using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
@@ -12,6 +13,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Common.Consts;
+using JiShe.CollectBus.FreeRedis;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
@@ -20,6 +22,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
private readonly IProducerService _producerService;
private readonly ILogger _logger;
private readonly IRepository _protocolInfoRepository;
+ private readonly IFreeRedisProvider _redisProvider;
//头部字节长度
public const int hearderLen = 6;
@@ -34,17 +37,17 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
/// The service provider.
protected BaseProtocolPlugin(IServiceProvider serviceProvider)
{
-
_logger = serviceProvider.GetRequiredService>();
_protocolInfoRepository = serviceProvider.GetRequiredService>();
_producerService = serviceProvider.GetRequiredService();
+ _redisProvider = serviceProvider.GetRequiredService();
}
public abstract ProtocolInfo Info { get; }
public virtual async Task GetAsync() => await Task.FromResult(Info);
- public virtual async Task AddAsync()
+ public virtual async Task LoadAsync()
{
if (Info == null)
{
@@ -53,7 +56,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
await _protocolInfoRepository.InsertAsync(Info);
- //await _protocolInfoCache.Get()
+ await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
+ await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
}
public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761;
@@ -1168,6 +1172,5 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
}
#endregion
-
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/CollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/CollectBusProtocolModule.cs
new file mode 100644
index 0000000..4f181db
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/CollectBusProtocolModule.cs
@@ -0,0 +1,18 @@
+using JiShe.CollectBus.FreeRedis.Options;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp.Modularity;
+
+namespace JiShe.CollectBus.Protocol.Contracts
+{
+ public class CollectBusProtocolModule : AbpModule
+ {
+ public override void ConfigureServices(ServiceConfigurationContext context)
+ {
+
+ }
+ }
+}
+
+
+
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index 2f48cd2..c35dbee 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{
Task GetAsync();
- Task AddAsync();
+ Task LoadAsync();
Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761;
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolService.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolService.cs
new file mode 100644
index 0000000..c8ea899
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolService.cs
@@ -0,0 +1,22 @@
+using JiShe.CollectBus.IotSystems.Protocols;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Volo.Abp;
+
+namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
+{
+ public interface IProtocolService
+ {
+ ///
+ /// 通过仪器设备型号获取协议信息
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task FirstOrDefaultByDeviceAsync(string deviceCode, bool isSpecial = false);
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
index 81469e8..495dbf1 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj
@@ -19,6 +19,7 @@
+
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs
new file mode 100644
index 0000000..3707e6b
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs
@@ -0,0 +1,52 @@
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using JiShe.CollectBus.Common.Consts;
+using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.FreeRedis;
+using JiShe.CollectBus.IotSystems.Protocols;
+using Volo.Abp.DependencyInjection;
+using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using System.Text.RegularExpressions;
+using Volo.Abp;
+
+namespace JiShe.CollectBus.Protocol.Contracts.Services
+{
+ public class ProtocolService : IProtocolService, ISingletonDependency
+ {
+ private readonly IFreeRedisProvider _freeRedisProvider;
+
+ public ProtocolService(IFreeRedisProvider freeRedisProvider)
+ {
+ _freeRedisProvider = freeRedisProvider;
+ }
+
+ ///
+ /// 通过仪器设备型号获取协议信息
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task FirstOrDefaultByDeviceAsync(string deviceCode, bool isSpecial = false)
+ {
+ var protocols = await _freeRedisProvider.Instance.HGetAllAsync(RedisConst.ProtocolKey);
+ var keyValuePair = protocols.FirstOrDefault(a => ContainsExactPartRegex(deviceCode, a.Value.RegularExpression));
+ if (!keyValuePair.Key.IsNullOrWhiteSpace() || keyValuePair.Value != null) return keyValuePair.Value;
+ if (isSpecial) throw new UserFriendlyException("The device protocol plugin does not exist!", ExceptionCode.NotFound);
+ var hasStandardProtocolPlugin = protocols.TryGetValue("StandardProtocolPlugin", out var protocolInfo);
+ if (!hasStandardProtocolPlugin) throw new UserFriendlyException("Standard protocol plugin does not exist!", ExceptionCode.NotFound);
+ return protocolInfo;
+ }
+
+ private static bool ContainsExactPartRegex(string searchPattern, string fullString)
+ {
+ // 构建正则表达式 - 匹配以逗号或开头为边界,以逗号或结尾为边界的部分
+ var pattern = $"(^|,)\\s*{Regex.Escape(searchPattern)}\\s*(,|$)";
+ return Regex.IsMatch(fullString, pattern, RegexOptions.IgnoreCase);
+ }
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj b/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj
index 8b89e97..bfc186f 100644
--- a/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj
+++ b/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj
@@ -17,7 +17,7 @@
-
+
diff --git a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs
similarity index 72%
rename from protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusProtocolModule.cs
rename to protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs
index 4abc95b..cdd5253 100644
--- a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusProtocolModule.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs
@@ -5,17 +5,17 @@ using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Protocol.Test
{
- public class JiSheCollectBusProtocolModule : AbpModule
+ public class JiSheCollectBusTestProtocolModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddKeyedSingleton(nameof(TestProtocolPlugin));
}
- public override void OnApplicationInitialization(ApplicationInitializationContext context)
+ public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
var protocol = context.ServiceProvider.GetRequiredKeyedService(nameof(TestProtocolPlugin));
- protocol.AddAsync();
+ await protocol.LoadAsync();
}
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
index 2573ab7..dbdfa82 100644
--- a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
@@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
+using NUglify.JavaScript.Syntax;
namespace JiShe.CollectBus.Protocol.Test
{
@@ -19,137 +20,9 @@ namespace JiShe.CollectBus.Protocol.Test
public sealed override ProtocolInfo Info => new(nameof(TestProtocolPlugin), "Test", "TCP", "Test协议", "DTS1980-Test");
- public override async Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null)
+ public override Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null)
{
throw new NotImplementedException();
}
-
- #region 上行命令
-
- //68
- //32 00
- //32 00
- //68
- //C9 1100'1001. 控制域C。
- // D7=1, (终端发送)上行方向。
- // D6=1, 此帧来自启动站。
- // D5=0, (上行方向)要求访问位。表示终端无事件数据等待访问。
- // D4=0, 保留
- // D3~D0=9, 功能码。链路测试
-
- //20 32 行政区划码
- //90 26 终端地址
- //00 主站地址和组地址标志。终端为单地址。 //3220 09 87 2
- // 终端启动的发送帧的 MSA 应为 0, 其主站响应帧的 MSA 也应为 0.
- //02 应用层功能码。AFN=2, 链路接口检测
- //70 0111'0000. 帧序列域。无时间标签、单帧、需要确认。
- //00 00 信息点。DA1和DA2全为“0”时,表示终端信息点。
- //01 00 信息类。F1, 登录。
- //44 帧尾,包含用户区数据校验和
- //16 帧结束标志
-
- ///
- /// 解析上行命令
- ///
- ///
- ///
- public CommandReulst? AnalysisCmd(string cmd)
- {
- CommandReulst? commandReulst = null;
- var hexStringList = cmd.StringToPairs();
-
- if (hexStringList.Count < hearderLen)
- {
- return commandReulst;
- }
- //验证起始字符
- if (!hexStringList[0].IsStartStr() || !hexStringList[5].IsStartStr())
- {
- return commandReulst;
- }
-
- var lenHexStr = $"{hexStringList[2]}{hexStringList[1]}";
- var lenBin = lenHexStr.HexToBin();
- var len = lenBin.Remove(lenBin.Length - 2).BinToDec();
- //验证长度
- if (hexStringList.Count - 2 != hearderLen + len)
- return commandReulst;
-
- var userDataIndex = hearderLen;
- var c = hexStringList[userDataIndex];//控制域 1字节
- userDataIndex += 1;
-
- var aHexList = hexStringList.Skip(userDataIndex).Take(5).ToList();//地址域 5字节
- var a = AnalysisA(aHexList);
- var a3Bin = aHexList[4].HexToBin().PadLeft(8, '0');
- var mSA = a3Bin.Substring(0, 7).BinToDec();
- userDataIndex += 5;
-
- var aFN = (AFN)hexStringList[userDataIndex].HexToDec();//1字节
- userDataIndex += 1;
-
- var seq = hexStringList[userDataIndex].HexToBin().PadLeft(8, '0');
- var tpV = (TpV)Convert.ToInt32(seq.Substring(0, 1));
- var fIRFIN = (FIRFIN)Convert.ToInt32(seq.Substring(1, 2));
- var cON = (CON)Convert.ToInt32(seq.Substring(3, 1));
- var prseqBin = seq.Substring(4, 4);
- userDataIndex += 1;
-
- // (DA2 - 1) * 8 + DA1 = pn
- var da1Bin = hexStringList[userDataIndex].HexToBin();
- var da1 = da1Bin == "0" ? 0 : da1Bin.Length;
- userDataIndex += 1;
- var da2 = hexStringList[userDataIndex].HexToDec();
- var pn = da2 == 0 ? 0 : (da2 - 1) * 8 + da1;
- userDataIndex += 1;
- //(DT2*8)+DT1=fn
- var dt1Bin = hexStringList[userDataIndex].HexToBin();
- var dt1 = dt1Bin != "0" ? dt1Bin.Length : 0;
- userDataIndex += 1;
- var dt2 = hexStringList[userDataIndex].HexToDec();
- var fn = dt2 * 8 + dt1;
- userDataIndex += 1;
-
- //数据单元
- var datas = hexStringList.Skip(userDataIndex).Take(len + hearderLen - userDataIndex).ToList();
-
- //EC
- //Tp
- commandReulst = new CommandReulst()
- {
- A = a,
- MSA = mSA,
- AFN = aFN,
- Seq = new Seq()
- {
- TpV = tpV,
- FIRFIN = fIRFIN,
- CON = cON,
- PRSEQ = prseqBin.BinToDec(),
- },
- CmdLength = len,
- Pn = pn,
- Fn = fn,
- HexDatas = datas
- };
-
- return commandReulst;
- }
-
- ///
- /// 解析地址
- ///
- ///
- ///
- private string AnalysisA(List aHexList)
- {
- var a1 = aHexList[1] + aHexList[0];
- var a2 = aHexList[3] + aHexList[2];
- var a2Dec = a2.HexToDec();
- var a3 = aHexList[4];
- var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}";
- return a;
- }
- #endregion
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
index 5cda0c8..b0275a5 100644
--- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
+++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
@@ -15,7 +15,7 @@ namespace JiShe.CollectBus.Protocol
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin));
- await standardProtocol.AddAsync();
+ await standardProtocol.LoadAsync();
}
}
}
diff --git a/services/JiShe.CollectBus.Application/CollectBusAppService.cs b/services/JiShe.CollectBus.Application/CollectBusAppService.cs
index e155c65..7c66b95 100644
--- a/services/JiShe.CollectBus.Application/CollectBusAppService.cs
+++ b/services/JiShe.CollectBus.Application/CollectBusAppService.cs
@@ -11,6 +11,7 @@ using System.Linq;
using System.Threading.Tasks;
using JiShe.CollectBus.FreeRedis;
using Volo.Abp.Application.Services;
+using Volo.Abp.Timing;
namespace JiShe.CollectBus;
@@ -20,7 +21,6 @@ public abstract class CollectBusAppService : ApplicationService
public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService();
protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService()!;
-
protected CollectBusAppService()
{
LocalizationResource = typeof(CollectBusResource);
diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
index 988c9df..c480b83 100644
--- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
+++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
@@ -28,6 +28,7 @@ using Microsoft.Extensions.Options;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.Common.Attributes;
+using ZstdSharp.Unsafe;
namespace JiShe.CollectBus;
@@ -42,7 +43,8 @@ namespace JiShe.CollectBus;
typeof(CollectBusFreeSqlModule),
typeof(CollectBusKafkaModule),
typeof(CollectBusIoTDbModule),
- typeof(CollectBusCassandraModule)
+ typeof(CollectBusCassandraModule),
+ typeof(CollectBusProtocolModule)
)]
public class CollectBusApplicationModule : AbpModule
{
@@ -63,13 +65,10 @@ public class CollectBusApplicationModule : AbpModule
context.Services.OnRegistered(ctx =>
{
var methods = ctx.ImplementationType.GetMethods();
- foreach (var method in methods)
+ var any = methods.Any(a=>a.GetCustomAttribute()!=null);
+ if (any)
{
- var attr = method.GetCustomAttribute(typeof(LogInterceptAttribute), true);
- if (attr != null)
- {
- ctx.Interceptors.TryAdd();
- }
+ ctx.Interceptors.TryAdd();
}
});
}
@@ -84,22 +83,14 @@ public class CollectBusApplicationModule : AbpModule
await context.AddBackgroundWorkerAsync(type);
}
- //默认初始化表计信息
- var dbContext = context.ServiceProvider.GetRequiredService();
- await dbContext.InitAmmeterCacheData();
- //await dbContext.InitWatermeterCacheData();
-
- //初始化主题信息
- var kafkaAdminClient = context.ServiceProvider.GetRequiredService();
- var kafkaOptions = context.ServiceProvider.GetRequiredService>();
-
- var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
- topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
-
- foreach (var item in topics)
+ Task.Run(() =>
{
- await kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor);
- }
+ //默认初始化表计信息
+ var dbContext = context.ServiceProvider.GetRequiredService();
+ dbContext.InitAmmeterCacheData();
+ //await dbContext.InitWatermeterCacheData();
+ }).ConfigureAwait(false);
+
}
}
diff --git a/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs b/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs
index d4ace87..6e2034a 100644
--- a/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs
+++ b/services/JiShe.CollectBus.Application/EnergySystem/CacheAppService.cs
@@ -18,7 +18,7 @@ namespace JiShe.CollectBus.EnergySystem
{
public class CacheAppService : CollectBusAppService, ICacheAppService
{
- public async Task SetHashByKey(string key)
+ public async Task SetHashByKey()
{
var data = await SqlProvider.Instance.Change(DbEnum.EnergyDB).Select().ToListAsync();
diff --git a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs
index a3969aa..86ddb49 100644
--- a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs
+++ b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs
@@ -5,5 +5,6 @@ namespace JiShe.CollectBus.Interceptors
[AttributeUsage(AttributeTargets.Method)]
public class LogInterceptAttribute : Attribute
{
+
}
}
diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs
index 68d367c..7b0500a 100644
--- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs
+++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs
@@ -26,6 +26,7 @@ using System.Diagnostics;
using System.Linq;
using Cassandra;
using JiShe.CollectBus.Interceptors;
+using JiShe.CollectBus.IotSystems.Protocols;
namespace JiShe.CollectBus.Samples;
@@ -35,15 +36,17 @@ public class TestAppService : CollectBusAppService
private readonly ILogger _logger;
private readonly ICassandraRepository _messageReceivedCassandraRepository;
private readonly ICassandraProvider _cassandraProvider;
+ private readonly IProtocolService _protocolService;
public TestAppService(
ILogger logger,
ICassandraRepository messageReceivedCassandraRepository,
- ICassandraProvider cassandraProvider)
+ ICassandraProvider cassandraProvider, IProtocolService protocolService)
{
_logger = logger;
_messageReceivedCassandraRepository = messageReceivedCassandraRepository;
_cassandraProvider = cassandraProvider;
+ _protocolService = protocolService;
}
public async Task AddMessageOfCassandra()
{
@@ -124,9 +127,15 @@ public class TestAppService : CollectBusAppService
}
[LogIntercept]
- public async Task LogInterceptorTest(string str)
+ public virtual Task LogInterceptorTest(string str)
{
_logger.LogWarning(str);
- return str;
+ return Task.FromResult(str) ;
+ }
+
+ public virtual async Task GetProtocol(string deviceCode, bool isSpecial = false)
+ {
+ var protocol = await _protocolService.FirstOrDefaultByDeviceAsync(deviceCode, isSpecial);
+ return protocol;
}
}
diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
index 8db1072..2ef854d 100644
--- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
+++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
@@ -19,6 +19,7 @@ using JiShe.CollectBus.IoTDB.Interface;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
using System.Collections.Generic;
+using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.Kafka.Internal;
namespace JiShe.CollectBus.Subscribers
@@ -59,8 +60,8 @@ namespace JiShe.CollectBus.Subscribers
_dbProvider = dbProvider;
}
+ [LogIntercept]
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
- //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task LoginIssuedEvent(List issuedEventMessages)
{
bool isAck = false;
@@ -97,7 +98,6 @@ namespace JiShe.CollectBus.Subscribers
}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
- //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task HeartbeatIssuedEvent(List issuedEventMessages)
{
bool isAck = false;
@@ -132,7 +132,6 @@ namespace JiShe.CollectBus.Subscribers
}
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
- //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task ReceivedEvent(MessageReceived receivedMessage)
{
var currentTime = Clock.Now;
@@ -189,7 +188,6 @@ namespace JiShe.CollectBus.Subscribers
}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)]
- //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages)
{
foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
@@ -209,7 +207,6 @@ namespace JiShe.CollectBus.Subscribers
}
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)]
- //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task ReceivedLoginEvent(List receivedLoginMessages)
{
foreach (var receivedLoginMessage in receivedLoginMessages)
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/CommunicationLogs/PacketLog.cs b/services/JiShe.CollectBus.Domain/IotSystems/CommunicationLogs/PacketLog.cs
new file mode 100644
index 0000000..bd57e22
--- /dev/null
+++ b/services/JiShe.CollectBus.Domain/IotSystems/CommunicationLogs/PacketLog.cs
@@ -0,0 +1,43 @@
+using System;
+
+namespace JiShe.CollectBus.IotSystems.CommunicationLogs
+{
+ public class PacketLog : ICassandraEntity
+ {
+ ///
+ /// 下行报文
+ ///
+ public string IssuedMessage { get; set; } = string.Empty;
+
+ ///
+ /// 上行报文
+ ///
+ public string ReportMessage { get; set; } = string.Empty;
+
+ public DateTime? IssuedTime { get; set; }
+ public DateTime? ReportTime { get; set; }
+
+ ///
+ /// 报文类型(是否需要回复)
+ ///
+ public PacketType PacketType { get; set; }
+
+ public Guid Id { get; set; }
+ }
+
+ public enum PacketType
+ {
+ ///
+ /// 只有下发,不需要回复
+ ///
+ OnlyIssued,
+ ///
+ /// 只有上报,不需要下发
+ ///
+ OnlyReport,
+ ///
+ /// 下发并且需要回复
+ ///
+ IssuedAndReport
+ }
+}
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs
index c193535..630b717 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/Protocols/ProtocolInfo.cs
@@ -16,6 +16,7 @@ namespace JiShe.CollectBus.IotSystems.Protocols
///
public ProtocolInfo(string name, string baseProtocol, string type, string description, string regularExpression)
{
+ Code = $"PL-{DateTime.Now:yyyyMMddHHmmss}";
Name = name;
Type = type;
Description = description;
@@ -23,6 +24,11 @@ namespace JiShe.CollectBus.IotSystems.Protocols
BaseProtocol = baseProtocol;
}
+ ///
+ /// 协议编码,唯一识别
+ ///
+ public string Code { get; set; }
+
///
/// 协议名称
///
diff --git a/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj b/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj
index a4dbc5d..a60ffe1 100644
--- a/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj
+++ b/services/JiShe.CollectBus.Domain/JiShe.CollectBus.Domain.csproj
@@ -9,9 +9,9 @@
-
-
-
+
+
+
@@ -34,9 +34,4 @@
-
-
-
-
-
diff --git a/shared/JiShe.CollectBus.Common/Consts/ExceptionCode.cs b/shared/JiShe.CollectBus.Common/Consts/ExceptionCode.cs
new file mode 100644
index 0000000..8fd63ad
--- /dev/null
+++ b/shared/JiShe.CollectBus.Common/Consts/ExceptionCode.cs
@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Consts
+{
+ public class ExceptionCode
+ {
+ public const string NotFound = "500404";
+ }
+}
diff --git a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs
index 7ac170b..53ad4f6 100644
--- a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs
+++ b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs
@@ -72,6 +72,11 @@ namespace JiShe.CollectBus.Common.Consts
/////
//public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
- public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
+ public const string CacheAmmeterFocusKey = $"{CacheBasicDirectoryKey}CacheAmmeterFocusKey";
+
+ ///
+ /// 协议池缓存标识
+ ///
+ public const string ProtocolKey = $"{CacheBasicDirectoryKey}Protocols";
}
}
diff --git a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs
index 94e7421..0c77e37 100644
--- a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs
+++ b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs
@@ -49,7 +49,7 @@ namespace JiShe.CollectBus.Common.Extensions
///
///
[Description("正则匹配单个")]
- public static string RegexMatch(this string str, string pattern)
+ public static string? RegexMatch(this string str, string pattern)
{
var reg = new Regex(pattern);
var match = reg.Match(str);
diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs
index 95e93c7..b038907 100644
--- a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -13,6 +13,7 @@ using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.Caching.StackExchangeRedis;
using Volo.Abp.Modularity;
using Volo.Abp.Swashbuckle;
+using Volo.Abp.Timing;
namespace JiShe.CollectBus.Host
{
@@ -24,6 +25,7 @@ namespace JiShe.CollectBus.Host
typeof(AbpAspNetCoreAuthenticationJwtBearerModule),
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule),
+ typeof(AbpTimingModule),
typeof(CollectBusApplicationModule),
typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule),
@@ -46,6 +48,7 @@ namespace JiShe.CollectBus.Host
//ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context);
ConfigureCustom(context, configuration);
+ Configure(options => { options.Kind = DateTimeKind.Local; });
}
diff --git a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj
index 3e60600..86f49a6 100644
--- a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj
+++ b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj
@@ -64,6 +64,9 @@
Always
+
+ Always
+