Compare commits

...

2 Commits

37 changed files with 3728 additions and 182 deletions

30
.dockerignore Normal file
View File

@ -0,0 +1,30 @@
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
!**/.gitignore
!.git/HEAD
!.git/config
!.git/packed-refs
!.git/refs/heads/**

View File

@ -49,6 +49,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EB
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "6.External", "6.External", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.PluginFileWatcher", "external\JiShe.CollectBus.PluginFileWatcher\JiShe.CollectBus.PluginFileWatcher.csproj", "{F767D1C3-6807-4AE3-996A-3A28FE8124C2}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -127,6 +131,10 @@ Global
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}.Release|Any CPU.Build.0 = Release|Any CPU
{F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F767D1C3-6807-4AE3-996A-3A28FE8124C2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -150,6 +158,7 @@ Global
{A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {A377955E-7EA1-6F29-8CF7-774569E93925} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59} {6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B} = {2E0FE301-34C3-4561-9CAE-C7A9E65AEE59}
{F767D1C3-6807-4AE3-996A-3A28FE8124C2} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -0,0 +1,287 @@
using System;
using System.Collections.Generic;
namespace JiShe.CollectBus.PluginFileWatcher
{
/// <summary>
/// 文件监控程序的配置类
/// </summary>
public class FileMonitorConfig
{
/// <summary>
/// 基本配置
/// </summary>
public GeneralConfig General { get; set; } = new GeneralConfig();
/// <summary>
/// 文件过滤配置
/// </summary>
public FileFiltersConfig FileFilters { get; set; } = new FileFiltersConfig();
/// <summary>
/// 性能相关配置
/// </summary>
public PerformanceConfig Performance { get; set; } = new PerformanceConfig();
/// <summary>
/// 健壮性相关配置
/// </summary>
public RobustnessConfig Robustness { get; set; } = new RobustnessConfig();
/// <summary>
/// 事件存储和回放配置
/// </summary>
public EventStorageConfig EventStorage { get; set; } = new EventStorageConfig();
/// <summary>
/// 文件系统通知过滤器配置
/// </summary>
public List<string> NotifyFilters { get; set; } = new List<string>();
/// <summary>
/// 日志配置
/// </summary>
public LoggingConfig Logging { get; set; } = new LoggingConfig();
}
/// <summary>
/// 常规配置
/// </summary>
public class GeneralConfig
{
/// <summary>
/// 是否启用文件过滤
/// </summary>
public bool EnableFileFiltering { get; set; } = true;
/// <summary>
/// 内存监控间隔(分钟)
/// </summary>
public int MemoryMonitorIntervalMinutes { get; set; } = 1;
/// <summary>
/// 默认监控路径
/// </summary>
public string DefaultMonitorPath { get; set; } = string.Empty;
}
/// <summary>
/// 文件过滤配置
/// </summary>
public class FileFiltersConfig
{
/// <summary>
/// 允许监控的文件扩展名
/// </summary>
public string[] AllowedExtensions { get; set; } = new[] { ".dll" };
/// <summary>
/// 排除的目录
/// </summary>
public string[] ExcludedDirectories { get; set; } = new[] { "bin", "obj", "node_modules" };
/// <summary>
/// 是否包含子目录
/// </summary>
public bool IncludeSubdirectories { get; set; } = true;
}
/// <summary>
/// 性能相关配置
/// </summary>
public class PerformanceConfig
{
/// <summary>
/// 内存清理阈值(事件数)
/// </summary>
public int MemoryCleanupThreshold { get; set; } = 5000;
/// <summary>
/// 通道容量
/// </summary>
public int ChannelCapacity { get; set; } = 1000;
/// <summary>
/// 事件去抖时间(秒)
/// </summary>
public int EventDebounceTimeSeconds { get; set; } = 3;
/// <summary>
/// 最大字典大小
/// </summary>
public int MaxDictionarySize { get; set; } = 10000;
/// <summary>
/// 清理间隔(秒)
/// </summary>
public int CleanupIntervalSeconds { get; set; } = 5;
/// <summary>
/// 处理延迟(毫秒)
/// </summary>
public int ProcessingDelayMs { get; set; } = 5;
}
/// <summary>
/// 健壮性相关配置
/// </summary>
public class RobustnessConfig
{
/// <summary>
/// 是否启用自动恢复机制
/// </summary>
public bool EnableAutoRecovery { get; set; } = true;
/// <summary>
/// 监控器健康检查间隔(秒)
/// </summary>
public int WatcherHealthCheckIntervalSeconds { get; set; } = 30;
/// <summary>
/// 监控器无响应超时时间(秒)
/// </summary>
public int WatcherTimeoutSeconds { get; set; } = 60;
/// <summary>
/// 监控器重启尝试最大次数
/// </summary>
public int MaxRestartAttempts { get; set; } = 3;
/// <summary>
/// 重启尝试之间的延迟(秒)
/// </summary>
public int RestartDelaySeconds { get; set; } = 5;
/// <summary>
/// 是否启用文件锁检测
/// </summary>
public bool EnableFileLockDetection { get; set; } = true;
/// <summary>
/// 对锁定文件的处理策略: Skip(跳过), Retry(重试), Log(仅记录)
/// </summary>
public string LockedFileStrategy { get; set; } = "Retry";
/// <summary>
/// 文件锁定重试次数
/// </summary>
public int FileLockRetryCount { get; set; } = 3;
/// <summary>
/// 文件锁定重试间隔(毫秒)
/// </summary>
public int FileLockRetryDelayMs { get; set; } = 500;
}
/// <summary>
/// 事件存储和回放配置
/// </summary>
public class EventStorageConfig
{
/// <summary>
/// 是否启用事件存储
/// </summary>
public bool EnableEventStorage { get; set; } = true;
/// <summary>
/// 存储类型SQLite 或 File
/// </summary>
public string StorageType { get; set; } = "SQLite";
/// <summary>
/// 事件存储目录
/// </summary>
public string StorageDirectory { get; set; } = "D:/EventLogs";
/// <summary>
/// SQLite数据库文件路径
/// </summary>
public string DatabasePath { get; set; } = "D:/EventLogs/events.db";
/// <summary>
/// SQLite连接字符串
/// </summary>
public string ConnectionString { get; set; } = "Data Source=D:/EventLogs/events.db";
/// <summary>
/// 数据库命令超时(秒)
/// </summary>
public int CommandTimeout { get; set; } = 30;
/// <summary>
/// 事件日志文件名格式 (使用DateTime.ToString格式)
/// </summary>
public string LogFileNameFormat { get; set; } = "FileEvents_{0:yyyy-MM-dd}.json";
/// <summary>
/// 存储间隔(秒),多久将缓存的事件写入一次存储
/// </summary>
public int StorageIntervalSeconds { get; set; } = 60;
/// <summary>
/// 事件批量写入大小,达到此数量时立即写入存储
/// </summary>
public int BatchSize { get; set; } = 100;
/// <summary>
/// 最大保留事件记录条数
/// </summary>
public int MaxEventRecords { get; set; } = 100000;
/// <summary>
/// 数据保留天数
/// </summary>
public int DataRetentionDays { get; set; } = 30;
/// <summary>
/// 最大保留日志文件数
/// </summary>
public int MaxLogFiles { get; set; } = 30;
/// <summary>
/// 是否压缩存储的事件日志
/// </summary>
public bool CompressLogFiles { get; set; } = true;
/// <summary>
/// 是否可以回放事件
/// </summary>
public bool EnableEventReplay { get; set; } = true;
/// <summary>
/// 回放时间间隔(毫秒)
/// </summary>
public int ReplayIntervalMs { get; set; } = 100;
/// <summary>
/// 回放速度倍率大于1加速小于1减速
/// </summary>
public double ReplaySpeedFactor { get; set; } = 1.0;
}
/// <summary>
/// 日志相关配置
/// </summary>
public class LoggingConfig
{
/// <summary>
/// 日志级别Verbose、Debug、Information、Warning、Error、Fatal
/// </summary>
public string LogLevel { get; set; } = "Information";
/// <summary>
/// 是否记录文件事件处理详情
/// </summary>
public bool LogFileEventDetails { get; set; } = false;
/// <summary>
/// 日志文件保留天数
/// </summary>
public int RetainedLogDays { get; set; } = 30;
/// <summary>
/// 日志文件目录
/// </summary>
public string LogDirectory { get; set; } = "Logs";
}
}

View File

@ -0,0 +1,277 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Serilog;
using Serilog.Extensions.Logging;
using ILogger = Microsoft.Extensions.Logging.ILogger;
namespace JiShe.CollectBus.PluginFileWatcher
{
/// <summary>
/// 数据库操作工具类,用于命令行测试数据库功能
/// </summary>
public class DbUtility
{
private readonly EventDatabaseManager _dbManager;
private readonly ILogger _logger;
private readonly FileMonitorConfig _config;
/// <summary>
/// 初始化数据库工具类
/// </summary>
/// <param name="configPath">配置文件路径</param>
public DbUtility(string configPath = "appsettings.json")
{
// 从配置文件加载配置
var configuration = new ConfigurationBuilder()
.AddJsonFile(configPath, optional: false, reloadOnChange: true)
.Build();
// 初始化日志
var serilogLogger = new LoggerConfiguration()
.ReadFrom.Configuration(configuration)
.CreateLogger();
// 将Serilog适配为Microsoft.Extensions.Logging.ILogger
_logger = new SerilogLoggerFactory(serilogLogger).CreateLogger("DbUtility");
// 创建配置对象
_config = new FileMonitorConfig();
configuration.GetSection("FileMonitor").Bind(_config);
// 确保SQLite存储已启用
_config.EventStorage.EnableEventStorage = true;
_config.EventStorage.StorageType = "SQLite";
// 创建数据库管理器
_dbManager = new EventDatabaseManager(_config, _logger);
}
/// <summary>
/// 执行数据库维护操作
/// </summary>
/// <param name="args">命令行参数</param>
public async Task ExecuteAsync(string[] args)
{
if (args.Length == 0)
{
PrintUsage();
return;
}
string command = args[0].ToLower();
switch (command)
{
case "stats":
await ShowStatisticsAsync();
break;
case "cleanup":
int days = args.Length > 1 && int.TryParse(args[1], out int d) ? d : _config.EventStorage.DataRetentionDays;
await _dbManager.CleanupOldDataAsync(days);
Console.WriteLine($"已清理 {days} 天前的旧数据");
break;
case "query":
await QueryEventsAsync(args);
break;
case "test":
await GenerateTestDataAsync(args);
break;
default:
Console.WriteLine($"未知命令: {command}");
PrintUsage();
break;
}
}
/// <summary>
/// 显示帮助信息
/// </summary>
private void PrintUsage()
{
Console.WriteLine("数据库工具使用方法:");
Console.WriteLine(" stats - 显示数据库统计信息");
Console.WriteLine(" cleanup [days] - 清理指定天数之前的数据(默认使用配置值)");
Console.WriteLine(" query [limit] - 查询最近的事件默认10条");
Console.WriteLine(" query type:X ext:Y - 按类型和扩展名查询事件");
Console.WriteLine(" test [count] - 生成测试数据默认100条");
}
/// <summary>
/// 显示数据库统计信息
/// </summary>
private async Task ShowStatisticsAsync()
{
try
{
var stats = await _dbManager.GetDatabaseStatsAsync();
Console.WriteLine("数据库统计信息:");
Console.WriteLine($"事件总数: {stats.TotalEvents}");
Console.WriteLine($"最早事件时间: {stats.OldestEventTime?.ToLocalTime()}");
Console.WriteLine($"最新事件时间: {stats.NewestEventTime?.ToLocalTime()}");
Console.WriteLine("\n事件类型分布:");
if (stats.EventTypeCounts != null)
{
foreach (var item in stats.EventTypeCounts)
{
Console.WriteLine($" {item.Key}: {item.Value}");
}
}
Console.WriteLine("\n扩展名分布 (Top 10):");
if (stats.TopExtensions != null)
{
foreach (var item in stats.TopExtensions)
{
Console.WriteLine($" {item.Key}: {item.Value}");
}
}
}
catch (Exception ex)
{
Console.WriteLine($"获取统计信息出错: {ex.Message}");
_logger.LogError(ex, "获取数据库统计信息时发生错误");
}
}
/// <summary>
/// 查询事件
/// </summary>
private async Task QueryEventsAsync(string[] args)
{
try
{
var queryParams = new EventQueryParams
{
PageSize = 10,
PageIndex = 0
};
// 解析查询参数
if (args.Length > 1)
{
foreach (var arg in args.Skip(1))
{
if (int.TryParse(arg, out int limit))
{
queryParams.PageSize = limit;
}
else if (arg.StartsWith("type:", StringComparison.OrdinalIgnoreCase))
{
string typeValue = arg.Substring(5);
if (Enum.TryParse<FileEventType>(typeValue, true, out var eventType))
{
queryParams.EventType = eventType;
}
}
else if (arg.StartsWith("ext:", StringComparison.OrdinalIgnoreCase))
{
queryParams.ExtensionFilter = arg.Substring(4);
if (!queryParams.ExtensionFilter.StartsWith("."))
{
queryParams.ExtensionFilter = "." + queryParams.ExtensionFilter;
}
}
else if (arg.StartsWith("path:", StringComparison.OrdinalIgnoreCase))
{
queryParams.PathFilter = arg.Substring(5);
}
}
}
// 执行查询
var result = await _dbManager.QueryEventsAsync(queryParams);
Console.WriteLine($"查询结果 (总数: {result.TotalCount}):");
foreach (var evt in result.Events)
{
string typeStr = evt.EventType.ToString();
string timestamp = evt.Timestamp.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss");
Console.WriteLine($"[{timestamp}] {typeStr,-10} {evt.FileName} ({evt.Extension})");
}
if (result.HasMore)
{
Console.WriteLine($"... 还有更多结果,共 {result.TotalCount} 条");
}
}
catch (Exception ex)
{
Console.WriteLine($"查询事件出错: {ex.Message}");
_logger.LogError(ex, "查询事件时发生错误");
}
}
/// <summary>
/// 生成测试数据
/// </summary>
private async Task GenerateTestDataAsync(string[] args)
{
try
{
int count = args.Length > 1 && int.TryParse(args[1], out int c) ? c : 100;
var events = new List<FileEvent>();
var rnd = new Random();
string[] extensions = { ".dll", ".exe", ".txt", ".cs", ".xml", ".json", ".png", ".jpg" };
string[] directories = { "C:\\Temp", "D:\\Work", "C:\\Program Files", "D:\\Projects", "E:\\Data" };
DateTime startTime = DateTime.Now.AddHours(-24);
for (int i = 0; i < count; i++)
{
var eventType = (FileEventType)rnd.Next(0, 5);
var ext = extensions[rnd.Next(extensions.Length)];
var dir = directories[rnd.Next(directories.Length)];
var fileName = $"TestFile_{i:D5}{ext}";
var timestamp = startTime.AddMinutes(i);
var fileEvent = new FileEvent
{
Id = Guid.NewGuid(),
Timestamp = timestamp,
EventType = eventType,
FullPath = $"{dir}\\{fileName}",
FileName = fileName,
Directory = dir,
Extension = ext,
FileSize = rnd.Next(1024, 1024 * 1024)
};
// 如果是重命名事件,添加旧文件名
if (eventType == FileEventType.Renamed)
{
fileEvent.OldFileName = $"OldFile_{i:D5}{ext}";
fileEvent.OldFullPath = $"{dir}\\{fileEvent.OldFileName}";
}
// 添加一些元数据
fileEvent.Metadata["CreationTime"] = timestamp.AddMinutes(-rnd.Next(1, 60)).ToString("o");
fileEvent.Metadata["LastWriteTime"] = timestamp.ToString("o");
fileEvent.Metadata["IsReadOnly"] = (rnd.Next(10) < 2).ToString();
fileEvent.Metadata["TestData"] = $"测试数据 {i}";
events.Add(fileEvent);
}
Console.WriteLine($"正在生成 {count} 条测试数据...");
await _dbManager.SaveEventsAsync(events);
Console.WriteLine("测试数据生成完成!");
}
catch (Exception ex)
{
Console.WriteLine($"生成测试数据出错: {ex.Message}");
_logger.LogError(ex, "生成测试数据时发生错误");
}
}
}
}

View File

@ -0,0 +1,28 @@
# 请参阅 https://aka.ms/customizecontainer 以了解如何自定义调试容器,以及 Visual Studio 如何使用此 Dockerfile 生成映像以更快地进行调试。
# 此阶段用于在快速模式(默认为调试配置)下从 VS 运行时
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base
USER $APP_UID
WORKDIR /app
# 此阶段用于生成服务项目
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
COPY ["external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj", "external/JiShe.CollectBus.PluginFileWatcher/"]
RUN dotnet restore "./external/JiShe.CollectBus.PluginFileWatcher/JiShe.CollectBus.PluginFileWatcher.csproj"
COPY . .
WORKDIR "/src/external/JiShe.CollectBus.PluginFileWatcher"
RUN dotnet build "./JiShe.CollectBus.PluginFileWatcher.csproj" -c $BUILD_CONFIGURATION -o /app/build
# 此阶段用于发布要复制到最终阶段的服务项目
FROM build AS publish
ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "./JiShe.CollectBus.PluginFileWatcher.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
# 此阶段在生产中使用,或在常规模式下从 VS 运行时使用(在不使用调试配置时为默认值)
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "JiShe.CollectBus.PluginFileWatcher.dll"]

View File

@ -0,0 +1,481 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.PluginFileWatcher
{
/// <summary>
/// SQLite数据库管理器用于管理文件事件的存储和检索
/// </summary>
public class EventDatabaseManager : IDisposable
{
private readonly FileMonitorConfig _config;
private readonly ILogger _logger;
private readonly string _connectionString;
private readonly string _databasePath;
private readonly int _commandTimeout;
private bool _disposed;
/// <summary>
/// 初始化数据库管理器
/// </summary>
/// <param name="config">配置对象</param>
/// <param name="logger">日志记录器</param>
public EventDatabaseManager(FileMonitorConfig config, ILogger logger)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
// 确保使用配置中的设置
_databasePath = config.EventStorage.DatabasePath;
_connectionString = config.EventStorage.ConnectionString;
_commandTimeout = config.EventStorage.CommandTimeout;
// 确保数据库目录存在
string dbDirectory = Path.GetDirectoryName(_databasePath);
if (!string.IsNullOrEmpty(dbDirectory) && !Directory.Exists(dbDirectory))
{
Directory.CreateDirectory(dbDirectory);
}
// 初始化数据库
InitializeDatabase().GetAwaiter().GetResult();
}
/// <summary>
/// 初始化数据库,确保必要的表已创建
/// </summary>
private async Task InitializeDatabase()
{
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 创建文件事件表
string createTableSql = @"
CREATE TABLE IF NOT EXISTS FileEvents (
Id TEXT PRIMARY KEY,
Timestamp TEXT NOT NULL,
EventType INTEGER NOT NULL,
FullPath TEXT NOT NULL,
FileName TEXT NOT NULL,
Directory TEXT NOT NULL,
Extension TEXT NOT NULL,
OldFileName TEXT,
OldFullPath TEXT,
FileSize INTEGER,
CreatedAt TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON FileEvents(Timestamp);
CREATE INDEX IF NOT EXISTS idx_events_eventtype ON FileEvents(EventType);
CREATE INDEX IF NOT EXISTS idx_events_extension ON FileEvents(Extension);";
await connection.ExecuteAsync(createTableSql, commandTimeout: _commandTimeout);
// 创建元数据表
string createMetadataTableSql = @"
CREATE TABLE IF NOT EXISTS EventMetadata (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
EventId TEXT NOT NULL,
MetadataKey TEXT NOT NULL,
MetadataValue TEXT,
FOREIGN KEY (EventId) REFERENCES FileEvents(Id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_metadata_eventid ON EventMetadata(EventId);
CREATE INDEX IF NOT EXISTS idx_metadata_key ON EventMetadata(MetadataKey);";
await connection.ExecuteAsync(createMetadataTableSql, commandTimeout: _commandTimeout);
_logger.LogInformation("数据库初始化成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "初始化数据库失败");
throw;
}
}
/// <summary>
/// 保存文件事件到数据库
/// </summary>
/// <param name="events">要保存的事件列表</param>
public async Task SaveEventsAsync(List<FileEvent> events)
{
if (events == null || events.Count == 0)
return;
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 开始事务
using var transaction = connection.BeginTransaction();
try
{
foreach (var fileEvent in events)
{
// 插入事件数据
string insertEventSql = @"
INSERT INTO FileEvents (
Id, Timestamp, EventType, FullPath, FileName,
Directory, Extension, OldFileName, OldFullPath,
FileSize, CreatedAt
) VALUES (
@Id, @Timestamp, @EventType, @FullPath, @FileName,
@Directory, @Extension, @OldFileName, @OldFullPath,
@FileSize, @CreatedAt
)";
await connection.ExecuteAsync(insertEventSql, new
{
Id = fileEvent.Id.ToString(), // 确保ID始终以字符串形式保存
Timestamp = fileEvent.Timestamp.ToString("o"),
EventType = (int)fileEvent.EventType,
fileEvent.FullPath,
fileEvent.FileName,
fileEvent.Directory,
fileEvent.Extension,
fileEvent.OldFileName,
fileEvent.OldFullPath,
fileEvent.FileSize,
CreatedAt = DateTime.UtcNow.ToString("o")
}, transaction, _commandTimeout);
// 插入元数据
if (fileEvent.Metadata != null && fileEvent.Metadata.Count > 0)
{
string insertMetadataSql = @"
INSERT INTO EventMetadata (EventId, MetadataKey, MetadataValue)
VALUES (@EventId, @MetadataKey, @MetadataValue)";
foreach (var metadata in fileEvent.Metadata)
{
await connection.ExecuteAsync(insertMetadataSql, new
{
EventId = fileEvent.Id.ToString(), // 确保ID以相同格式保存
MetadataKey = metadata.Key,
MetadataValue = metadata.Value
}, transaction, _commandTimeout);
}
}
}
// 提交事务
transaction.Commit();
_logger.LogInformation($"已成功保存 {events.Count} 个事件到数据库");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
_logger.LogError(ex, "保存事件到数据库时发生错误");
throw;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "连接数据库失败");
throw;
}
}
/// <summary>
/// 查询事件
/// </summary>
/// <param name="queryParams">查询参数</param>
/// <returns>查询结果</returns>
public async Task<EventQueryResult> QueryEventsAsync(EventQueryParams queryParams)
{
if (queryParams == null)
throw new ArgumentNullException(nameof(queryParams));
var result = new EventQueryResult
{
StartTime = queryParams.StartTime ?? DateTime.MinValue,
EndTime = queryParams.EndTime ?? DateTime.MaxValue
};
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 构建查询条件
var conditions = new List<string>();
var parameters = new DynamicParameters();
if (queryParams.StartTime.HasValue)
{
conditions.Add("Timestamp >= @StartTime");
parameters.Add("@StartTime", queryParams.StartTime.Value.ToString("o"));
}
if (queryParams.EndTime.HasValue)
{
conditions.Add("Timestamp <= @EndTime");
parameters.Add("@EndTime", queryParams.EndTime.Value.ToString("o"));
}
if (queryParams.EventType.HasValue)
{
conditions.Add("EventType = @EventType");
parameters.Add("@EventType", (int)queryParams.EventType.Value);
}
if (!string.IsNullOrEmpty(queryParams.PathFilter))
{
conditions.Add("FullPath LIKE @PathFilter");
parameters.Add("@PathFilter", $"%{queryParams.PathFilter}%");
}
if (!string.IsNullOrEmpty(queryParams.ExtensionFilter))
{
conditions.Add("Extension = @ExtensionFilter");
parameters.Add("@ExtensionFilter", queryParams.ExtensionFilter);
}
// 构建WHERE子句
string whereClause = conditions.Count > 0
? $"WHERE {string.Join(" AND ", conditions)}"
: string.Empty;
// 构建ORDER BY子句
string orderByClause = queryParams.AscendingOrder
? "ORDER BY Timestamp ASC"
: "ORDER BY Timestamp DESC";
// 获取总记录数
string countSql = $"SELECT COUNT(*) FROM FileEvents {whereClause}";
result.TotalCount = await connection.ExecuteScalarAsync<int>(countSql, parameters, commandTimeout: _commandTimeout);
// 应用分页
string paginationClause = $"LIMIT @PageSize OFFSET @Offset";
parameters.Add("@PageSize", queryParams.PageSize);
parameters.Add("@Offset", queryParams.PageIndex * queryParams.PageSize);
// 查询事件数据
string querySql = $@"
SELECT Id,
Timestamp,
EventType,
FullPath,
FileName,
Directory,
Extension,
OldFileName,
OldFullPath,
FileSize
FROM FileEvents
{whereClause}
{orderByClause}
{paginationClause}";
var events = await connection.QueryAsync<dynamic>(querySql, parameters, commandTimeout: _commandTimeout);
// 处理查询结果
foreach (var eventData in events)
{
var fileEvent = new FileEvent
{
Id = Guid.Parse(eventData.Id),
Timestamp = DateTime.Parse(eventData.Timestamp),
EventType = (FileEventType)eventData.EventType,
FullPath = eventData.FullPath,
FileName = eventData.FileName,
Directory = eventData.Directory,
Extension = eventData.Extension,
OldFileName = eventData.OldFileName,
OldFullPath = eventData.OldFullPath,
FileSize = eventData.FileSize
};
// 获取元数据
string metadataSql = "SELECT MetadataKey, MetadataValue FROM EventMetadata WHERE EventId = @EventId";
var metadata = await connection.QueryAsync<dynamic>(metadataSql, new { EventId = fileEvent.Id.ToString() }, commandTimeout: _commandTimeout);
foreach (var item in metadata)
{
fileEvent.Metadata[item.MetadataKey] = item.MetadataValue;
}
result.Events.Add(fileEvent);
}
result.HasMore = (queryParams.PageIndex + 1) * queryParams.PageSize < result.TotalCount;
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "查询事件时发生错误");
throw;
}
}
/// <summary>
/// 清理旧数据
/// </summary>
/// <param name="retentionDays">数据保留天数</param>
public async Task CleanupOldDataAsync(int retentionDays)
{
if (retentionDays <= 0)
return;
try
{
DateTime cutoffDate = DateTime.UtcNow.AddDays(-retentionDays);
string cutoffDateStr = cutoffDate.ToString("o");
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 删除旧事件(级联删除元数据)
string deleteSql = "DELETE FROM FileEvents WHERE Timestamp < @CutoffDate";
int deletedCount = await connection.ExecuteAsync(deleteSql, new { CutoffDate = cutoffDateStr }, commandTimeout: _commandTimeout);
_logger.LogInformation($"已清理 {deletedCount} 条旧事件数据({retentionDays}天前)");
}
catch (Exception ex)
{
_logger.LogError(ex, "清理旧数据时发生错误");
throw;
}
}
/// <summary>
/// 获取数据库统计信息
/// </summary>
/// <returns>数据库统计信息</returns>
public async Task<DatabaseStats> GetDatabaseStatsAsync()
{
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
var stats = new DatabaseStats();
// 获取事件总数
stats.TotalEvents = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM FileEvents", commandTimeout: _commandTimeout);
// 获取最早和最新事件时间
stats.OldestEventTime = await connection.ExecuteScalarAsync<DateTime?>("SELECT Timestamp FROM FileEvents ORDER BY Timestamp ASC LIMIT 1", commandTimeout: _commandTimeout);
stats.NewestEventTime = await connection.ExecuteScalarAsync<DateTime?>("SELECT Timestamp FROM FileEvents ORDER BY Timestamp DESC LIMIT 1", commandTimeout: _commandTimeout);
// 获取事件类型分布
var eventTypeCounts = await connection.QueryAsync<dynamic>("SELECT EventType, COUNT(*) AS Count FROM FileEvents GROUP BY EventType", commandTimeout: _commandTimeout);
stats.EventTypeCounts = new Dictionary<FileEventType, int>();
foreach (var item in eventTypeCounts)
{
stats.EventTypeCounts[(FileEventType)item.EventType] = item.Count;
}
// 获取扩展名分布前10个
var extensionCounts = await connection.QueryAsync<dynamic>(
"SELECT Extension, COUNT(*) AS Count FROM FileEvents GROUP BY Extension ORDER BY Count DESC LIMIT 10",
commandTimeout: _commandTimeout);
stats.TopExtensions = new Dictionary<string, int>();
foreach (var item in extensionCounts)
{
stats.TopExtensions[item.Extension] = item.Count;
}
return stats;
}
catch (Exception ex)
{
_logger.LogError(ex, "获取数据库统计信息时发生错误");
throw;
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
}
}
/// <summary>
/// 数据库统计信息
/// </summary>
public class DatabaseStats
{
/// <summary>
/// 事件总数
/// </summary>
public int TotalEvents { get; set; }
/// <summary>
/// 最早事件时间
/// </summary>
public DateTime? OldestEventTime { get; set; }
/// <summary>
/// 最新事件时间
/// </summary>
public DateTime? NewestEventTime { get; set; }
/// <summary>
/// 事件类型计数
/// </summary>
public Dictionary<FileEventType, int> EventTypeCounts { get; set; }
/// <summary>
/// 排名前列的文件扩展名
/// </summary>
public Dictionary<string, int> TopExtensions { get; set; }
}
}

View File

@ -0,0 +1,745 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.PluginFileWatcher
{
/// <summary>
/// 负责文件事件的存储、查询和回放
/// </summary>
public class EventStorage : IDisposable
{
private readonly FileMonitorConfig _config;
private readonly ILogger _logger;
private readonly ConcurrentQueue<FileEvent> _eventQueue;
private readonly Timer _storageTimer;
private readonly SemaphoreSlim _storageLock = new SemaphoreSlim(1, 1);
private readonly string _storageDirectory;
private readonly EventDatabaseManager _dbManager;
private bool _disposed;
/// <summary>
/// 创建新的事件存储管理器实例
/// </summary>
/// <param name="config">文件监控配置</param>
/// <param name="logger">日志记录器</param>
public EventStorage(FileMonitorConfig config, ILogger logger)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_eventQueue = new ConcurrentQueue<FileEvent>();
// 确保存储目录存在
_storageDirectory = !string.IsNullOrEmpty(_config.EventStorage.StorageDirectory)
? _config.EventStorage.StorageDirectory
: Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "EventLogs");
if (!Directory.Exists(_storageDirectory))
{
Directory.CreateDirectory(_storageDirectory);
}
// 创建数据库管理器如果配置为SQLite存储类型
if (config.EventStorage.EnableEventStorage &&
config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase))
{
_dbManager = new EventDatabaseManager(config, logger);
_logger.LogInformation("已初始化SQLite事件存储");
}
// 初始化存储定时器(如果启用)
if (_config.EventStorage.EnableEventStorage)
{
var intervalMs = _config.EventStorage.StorageIntervalSeconds * 1000;
_storageTimer = new Timer(SaveEventsTimerCallback, null, intervalMs, intervalMs);
_logger.LogInformation($"事件存储已初始化,存储目录:{_storageDirectory},存储间隔:{_config.EventStorage.StorageIntervalSeconds}秒");
}
}
/// <summary>
/// 记录一个文件事件
/// </summary>
/// <param name="fileEvent">文件事件</param>
public void RecordEvent(FileEvent fileEvent)
{
if (fileEvent == null || !_config.EventStorage.EnableEventStorage) return;
_eventQueue.Enqueue(fileEvent);
_logger.LogDebug($"文件事件已加入队列:{fileEvent.EventType} - {fileEvent.FullPath}");
}
/// <summary>
/// 从FileSystemEventArgs记录事件
/// </summary>
/// <param name="e">文件系统事件参数</param>
public void RecordEvent(FileSystemEventArgs e)
{
if (e == null || !_config.EventStorage.EnableEventStorage) return;
var fileEvent = FileEvent.FromFileSystemEventArgs(e);
RecordEvent(fileEvent);
}
/// <summary>
/// 定时将事件保存到文件
/// </summary>
private async void SaveEventsTimerCallback(object state)
{
if (_disposed || _eventQueue.IsEmpty) return;
try
{
// 防止多个定时器回调同时执行
if (!await _storageLock.WaitAsync(0))
{
return;
}
try
{
// 从队列中取出事件
List<FileEvent> eventsToSave = new List<FileEvent>();
int batchSize = _config.EventStorage.BatchSize;
while (eventsToSave.Count < batchSize && !_eventQueue.IsEmpty)
{
if (_eventQueue.TryDequeue(out var fileEvent))
{
eventsToSave.Add(fileEvent);
}
}
if (eventsToSave.Count > 0)
{
await SaveEventsToFileAsync(eventsToSave);
_logger.LogInformation($"已成功保存 {eventsToSave.Count} 个事件");
}
// 如果有配置,清理旧日志文件
if (_config.EventStorage.MaxLogFiles > 0)
{
await CleanupOldLogFilesAsync();
}
// 如果有配置,清理旧数据库记录
if (_dbManager != null && _config.EventStorage.DataRetentionDays > 0)
{
await _dbManager.CleanupOldDataAsync(_config.EventStorage.DataRetentionDays);
}
}
finally
{
_storageLock.Release();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "保存事件时发生错误");
}
}
/// <summary>
/// 将事件保存到文件
/// </summary>
/// <param name="events">要保存的事件列表</param>
private async Task SaveEventsToFileAsync(List<FileEvent> events)
{
if (events == null || events.Count == 0) return;
try
{
// 根据存储类型选择保存方式
if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
{
// 保存到SQLite数据库
await _dbManager.SaveEventsAsync(events);
}
else
{
// 保存到文件
string fileName = string.Format(
_config.EventStorage.LogFileNameFormat,
DateTime.Now);
string filePath = Path.Combine(_storageDirectory, fileName);
// 创建事件日志文件对象
var logFile = new EventLogFile
{
CreatedTime = DateTime.UtcNow,
Events = events
};
// 序列化为JSON
string jsonContent = JsonSerializer.Serialize(logFile, new JsonSerializerOptions
{
WriteIndented = true
});
// 是否启用压缩
if (_config.EventStorage.CompressLogFiles)
{
string gzFilePath = $"{filePath}.gz";
await CompressAndSaveStringAsync(jsonContent, gzFilePath);
_logger.LogInformation($"已将事件保存到压缩文件:{gzFilePath}");
}
else
{
await File.WriteAllTextAsync(filePath, jsonContent);
_logger.LogInformation($"已将事件保存到文件:{filePath}");
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "保存事件到文件时发生错误");
throw;
}
}
/// <summary>
/// 压缩并保存字符串到文件
/// </summary>
/// <param name="content">要保存的内容</param>
/// <param name="filePath">文件路径</param>
private static async Task CompressAndSaveStringAsync(string content, string filePath)
{
using var fileStream = new FileStream(filePath, FileMode.Create);
using var gzipStream = new GZipStream(fileStream, CompressionLevel.Optimal);
using var writer = new StreamWriter(gzipStream);
await writer.WriteAsync(content);
}
/// <summary>
/// 清理过多的日志文件
/// </summary>
private async Task CleanupOldLogFilesAsync()
{
try
{
// 检查是否需要清理
if (_config.EventStorage.MaxLogFiles <= 0) return;
var directory = new DirectoryInfo(_storageDirectory);
var logFiles = directory.GetFiles("*.*")
.Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
.OrderByDescending(f => f.CreationTime)
.ToArray();
// 如果文件数量超过最大值,删除最旧的文件
if (logFiles.Length > _config.EventStorage.MaxLogFiles)
{
int filesToDelete = logFiles.Length - _config.EventStorage.MaxLogFiles;
var filesToRemove = logFiles.Skip(logFiles.Length - filesToDelete).ToArray();
foreach (var file in filesToRemove)
{
try
{
file.Delete();
_logger.LogInformation($"已删除旧的事件日志文件:{file.Name}");
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"删除旧日志文件失败:{file.FullName}");
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "清理旧日志文件时发生错误");
}
await Task.CompletedTask;
}
/// <summary>
/// 查询历史事件
/// </summary>
/// <param name="queryParams">查询参数</param>
/// <returns>查询结果</returns>
public async Task<EventQueryResult> QueryEventsAsync(EventQueryParams queryParams)
{
if (queryParams == null) throw new ArgumentNullException(nameof(queryParams));
// 如果是SQLite存储且数据库管理器可用使用数据库查询
if (_config.EventStorage.StorageType.Equals("SQLite", StringComparison.OrdinalIgnoreCase) && _dbManager != null)
{
return await _dbManager.QueryEventsAsync(queryParams);
}
var result = new EventQueryResult
{
StartTime = queryParams.StartTime ?? DateTime.MinValue,
EndTime = queryParams.EndTime ?? DateTime.MaxValue
};
try
{
await _storageLock.WaitAsync();
try
{
// 获取所有日志文件
var directory = new DirectoryInfo(_storageDirectory);
if (!directory.Exists)
{
return result;
}
var logFiles = directory.GetFiles("*.*")
.Where(f => f.Name.EndsWith(".json") || f.Name.EndsWith(".gz"))
.OrderByDescending(f => f.CreationTime)
.ToArray();
List<FileEvent> allEvents = new List<FileEvent>();
// 加载所有日志文件中的事件
foreach (var file in logFiles)
{
try
{
var events = await LoadEventsFromFileAsync(file.FullName);
if (events != null && events.Count > 0)
{
allEvents.AddRange(events);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"从文件加载事件失败:{file.FullName}");
}
}
// 内存中队列的事件也包含在查询中
FileEvent[] queuedEvents = _eventQueue.ToArray();
allEvents.AddRange(queuedEvents);
// 应用查询过滤条件
var filteredEvents = allEvents
.Where(e => (queryParams.StartTime == null || e.Timestamp >= queryParams.StartTime) &&
(queryParams.EndTime == null || e.Timestamp <= queryParams.EndTime) &&
(queryParams.EventType == null || e.EventType == queryParams.EventType.Value) &&
(string.IsNullOrEmpty(queryParams.PathFilter) || e.FullPath.Contains(queryParams.PathFilter, StringComparison.OrdinalIgnoreCase)) &&
(string.IsNullOrEmpty(queryParams.ExtensionFilter) || e.Extension.Equals(queryParams.ExtensionFilter, StringComparison.OrdinalIgnoreCase)))
.ToList();
// 应用排序
IEnumerable<FileEvent> orderedEvents = queryParams.AscendingOrder
? filteredEvents.OrderBy(e => e.Timestamp)
: filteredEvents.OrderByDescending(e => e.Timestamp);
// 计算总数
result.TotalCount = filteredEvents.Count;
// 应用分页
int skip = queryParams.PageIndex * queryParams.PageSize;
int take = queryParams.PageSize;
result.Events = orderedEvents.Skip(skip).Take(take).ToList();
result.HasMore = (skip + take) < result.TotalCount;
return result;
}
finally
{
_storageLock.Release();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "查询事件时发生错误");
throw;
}
}
/// <summary>
/// 从文件加载事件
/// </summary>
/// <param name="filePath">文件路径</param>
/// <returns>事件列表</returns>
private async Task<List<FileEvent>> LoadEventsFromFileAsync(string filePath)
{
if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath))
{
return new List<FileEvent>();
}
try
{
string jsonContent;
// 处理压缩文件
if (filePath.EndsWith(".gz"))
{
using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read);
using var gzipStream = new GZipStream(fileStream, CompressionMode.Decompress);
using var reader = new StreamReader(gzipStream);
jsonContent = await reader.ReadToEndAsync();
}
else
{
jsonContent = await File.ReadAllTextAsync(filePath);
}
var logFile = JsonSerializer.Deserialize<EventLogFile>(jsonContent);
return logFile?.Events ?? new List<FileEvent>();
}
catch (Exception ex)
{
_logger.LogError(ex, $"从文件加载事件失败:{filePath}");
return new List<FileEvent>();
}
}
/// <summary>
/// 启动事件回放会话
/// </summary>
/// <param name="queryParams">查询参数,定义要回放的事件</param>
/// <param name="replayHandler">回放处理回调</param>
/// <param name="cancellationToken">取消标记</param>
/// <returns>回放会话控制器</returns>
public async Task<EventReplaySession> StartReplayAsync(
EventQueryParams queryParams,
Func<FileEvent, Task> replayHandler,
CancellationToken cancellationToken = default)
{
if (replayHandler == null) throw new ArgumentNullException(nameof(replayHandler));
// 查询要回放的事件
var queryResult = await QueryEventsAsync(queryParams);
// 创建并启动回放会话
var session = new EventReplaySession(
queryResult.Events,
replayHandler,
_config.EventStorage.ReplayIntervalMs,
_config.EventStorage.ReplaySpeedFactor,
_logger,
cancellationToken);
await session.StartAsync();
return session;
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_storageTimer?.Dispose();
_storageLock?.Dispose();
_dbManager?.Dispose();
// 尝试保存剩余事件
if (_config.EventStorage.EnableEventStorage && !_eventQueue.IsEmpty)
{
var remainingEvents = new List<FileEvent>();
while (!_eventQueue.IsEmpty && _eventQueue.TryDequeue(out var evt))
{
remainingEvents.Add(evt);
}
if (remainingEvents.Count > 0)
{
SaveEventsToFileAsync(remainingEvents).GetAwaiter().GetResult();
}
}
GC.SuppressFinalize(this);
}
}
/// <summary>
/// 事件回放会话
/// </summary>
public class EventReplaySession : IDisposable
{
private readonly List<FileEvent> _events;
private readonly Func<FileEvent, Task> _replayHandler;
private readonly int _replayIntervalMs;
private readonly double _speedFactor;
private readonly ILogger _logger;
private readonly CancellationToken _cancellationToken;
private CancellationTokenSource _linkedCts;
private Task _replayTask;
private bool _disposed;
private bool _isPaused;
private readonly SemaphoreSlim _pauseSemaphore = new SemaphoreSlim(1, 1);
/// <summary>
/// 回放进度0-100
/// </summary>
public int Progress { get; private set; }
/// <summary>
/// 当前回放的事件索引
/// </summary>
public int CurrentIndex { get; private set; }
/// <summary>
/// 事件总数
/// </summary>
public int TotalEvents => _events?.Count ?? 0;
/// <summary>
/// 回放是否已完成
/// </summary>
public bool IsCompleted { get; private set; }
/// <summary>
/// 回放是否已暂停
/// </summary>
public bool IsPaused => _isPaused;
/// <summary>
/// 回放已处理的事件数
/// </summary>
public int ProcessedEvents { get; private set; }
/// <summary>
/// 回放开始时间
/// </summary>
public DateTime StartTime { get; private set; }
/// <summary>
/// 回放结束时间(如果已完成)
/// </summary>
public DateTime? EndTime { get; private set; }
/// <summary>
/// 创建新的事件回放会话
/// </summary>
/// <param name="events">要回放的事件</param>
/// <param name="replayHandler">回放处理回调</param>
/// <param name="replayIntervalMs">回放间隔(毫秒)</param>
/// <param name="speedFactor">速度因子</param>
/// <param name="logger">日志记录器</param>
/// <param name="cancellationToken">取消标记</param>
public EventReplaySession(
List<FileEvent> events,
Func<FileEvent, Task> replayHandler,
int replayIntervalMs,
double speedFactor,
ILogger logger,
CancellationToken cancellationToken = default)
{
_events = events ?? throw new ArgumentNullException(nameof(events));
_replayHandler = replayHandler ?? throw new ArgumentNullException(nameof(replayHandler));
_replayIntervalMs = Math.Max(1, replayIntervalMs);
_speedFactor = Math.Max(0.1, speedFactor);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_cancellationToken = cancellationToken;
_linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
}
/// <summary>
/// 启动回放
/// </summary>
public async Task StartAsync()
{
if (_replayTask != null) return;
StartTime = DateTime.Now;
_replayTask = Task.Run(ReplayEventsAsync, _linkedCts.Token);
await Task.CompletedTask;
}
/// <summary>
/// 暂停回放
/// </summary>
public async Task PauseAsync()
{
if (_isPaused || IsCompleted) return;
await _pauseSemaphore.WaitAsync();
try
{
_isPaused = true;
}
finally
{
_pauseSemaphore.Release();
}
_logger.LogInformation("事件回放已暂停");
}
/// <summary>
/// 恢复回放
/// </summary>
public async Task ResumeAsync()
{
if (!_isPaused || IsCompleted) return;
await _pauseSemaphore.WaitAsync();
try
{
_isPaused = false;
// 释放信号量以允许回放任务继续
_pauseSemaphore.Release();
}
catch
{
_pauseSemaphore.Release();
throw;
}
_logger.LogInformation("事件回放已恢复");
}
/// <summary>
/// 停止回放
/// </summary>
public async Task StopAsync()
{
if (IsCompleted) return;
try
{
// 取消回放任务
_linkedCts?.Cancel();
// 如果暂停中,先恢复以允许取消
if (_isPaused)
{
await ResumeAsync();
}
// 等待任务完成
if (_replayTask != null)
{
await Task.WhenAny(_replayTask, Task.Delay(1000));
}
IsCompleted = true;
EndTime = DateTime.Now;
_logger.LogInformation("事件回放已手动停止");
}
catch (Exception ex)
{
_logger.LogError(ex, "停止事件回放时发生错误");
}
}
/// <summary>
/// 回放事件处理
/// </summary>
private async Task ReplayEventsAsync()
{
try
{
_logger.LogInformation($"开始回放{_events.Count}个事件,速度因子:{_speedFactor}");
if (_events.Count == 0)
{
IsCompleted = true;
EndTime = DateTime.Now;
return;
}
DateTime? lastEventTime = null;
for (int i = 0; i < _events.Count; i++)
{
// 检查是否取消
if (_linkedCts.Token.IsCancellationRequested)
{
_logger.LogInformation("事件回放已取消");
break;
}
// 检查暂停状态
if (_isPaused)
{
// 等待恢复信号
await _pauseSemaphore.WaitAsync(_linkedCts.Token);
_pauseSemaphore.Release();
}
var currentEvent = _events[i];
CurrentIndex = i;
// 计算等待时间(根据事件之间的实际时间差和速度因子)
if (lastEventTime.HasValue && i > 0)
{
var actualTimeDiff = currentEvent.Timestamp - lastEventTime.Value;
var waitTimeMs = (int)(actualTimeDiff.TotalMilliseconds / _speedFactor);
// 应用最小等待时间
waitTimeMs = Math.Max(_replayIntervalMs, waitTimeMs);
// 等待指定时间
await Task.Delay(waitTimeMs, _linkedCts.Token);
}
// 处理当前事件
try
{
await _replayHandler(currentEvent);
ProcessedEvents++;
// 更新进度
Progress = (int)((i + 1) * 100.0 / _events.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理回放事件时发生错误:{currentEvent.EventType} - {currentEvent.FullPath}");
}
lastEventTime = currentEvent.Timestamp;
}
// 完成回放
IsCompleted = true;
Progress = 100;
EndTime = DateTime.Now;
_logger.LogInformation($"事件回放已完成,共处理{ProcessedEvents}个事件,耗时:{(EndTime.Value - StartTime).TotalSeconds:F2}秒");
}
catch (OperationCanceledException)
{
_logger.LogInformation("事件回放已取消");
IsCompleted = true;
EndTime = DateTime.Now;
}
catch (Exception ex)
{
_logger.LogError(ex, "事件回放过程中发生错误");
IsCompleted = true;
EndTime = DateTime.Now;
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_linkedCts?.Cancel();
_linkedCts?.Dispose();
_pauseSemaphore?.Dispose();
GC.SuppressFinalize(this);
}
}
}

View File

@ -0,0 +1,254 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json.Serialization;
namespace JiShe.CollectBus.PluginFileWatcher
{
/// <summary>
/// 表示一个文件系统事件的数据模型,用于序列化和存储
/// </summary>
public class FileEvent
{
/// <summary>
/// 事件唯一标识
/// </summary>
public Guid Id { get; set; } = Guid.NewGuid();
/// <summary>
/// 事件发生时间
/// </summary>
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
/// <summary>
/// 事件类型
/// </summary>
public FileEventType EventType { get; set; }
/// <summary>
/// 文件完整路径
/// </summary>
public string FullPath { get; set; }
/// <summary>
/// 文件名
/// </summary>
public string FileName { get; set; }
/// <summary>
/// 文件所在目录
/// </summary>
public string Directory { get; set; }
/// <summary>
/// 文件扩展名
/// </summary>
public string Extension { get; set; }
/// <summary>
/// 重命名前的旧文件名(仅在重命名事件中有效)
/// </summary>
public string OldFileName { get; set; }
/// <summary>
/// 重命名前的旧路径(仅在重命名事件中有效)
/// </summary>
public string OldFullPath { get; set; }
/// <summary>
/// 文件大小(字节),如果可获取
/// </summary>
public long? FileSize { get; set; }
/// <summary>
/// 自定义属性,可用于存储其他元数据
/// </summary>
public Dictionary<string, string> Metadata { get; set; } = new Dictionary<string, string>();
/// <summary>
/// 从FileSystemEventArgs创建FileEvent
/// </summary>
/// <param name="e">FileSystemEventArgs参数</param>
/// <returns>FileEvent对象</returns>
public static FileEvent FromFileSystemEventArgs(FileSystemEventArgs e)
{
var fileEvent = new FileEvent
{
EventType = GetEventType(e.ChangeType),
FullPath = e.FullPath,
FileName = e.Name ?? Path.GetFileName(e.FullPath),
Directory = Path.GetDirectoryName(e.FullPath),
Extension = Path.GetExtension(e.FullPath)
};
// 如果是重命名事件,添加旧文件名信息
if (e is RenamedEventArgs renamedEvent)
{
fileEvent.OldFileName = Path.GetFileName(renamedEvent.OldFullPath);
fileEvent.OldFullPath = renamedEvent.OldFullPath;
}
// 尝试获取文件大小(如果文件存在且可访问)
try
{
if (File.Exists(e.FullPath) && e.ChangeType != WatcherChangeTypes.Deleted)
{
var fileInfo = new FileInfo(e.FullPath);
fileEvent.FileSize = fileInfo.Length;
// 添加一些额外的元数据
fileEvent.Metadata["CreationTime"] = fileInfo.CreationTime.ToString("o");
fileEvent.Metadata["LastWriteTime"] = fileInfo.LastWriteTime.ToString("o");
fileEvent.Metadata["IsReadOnly"] = fileInfo.IsReadOnly.ToString();
}
}
catch
{
// 忽略任何获取文件信息时的错误
}
return fileEvent;
}
/// <summary>
/// 将WatcherChangeTypes转换为FileEventType
/// </summary>
/// <param name="changeType">WatcherChangeTypes枚举值</param>
/// <returns>对应的FileEventType</returns>
public static FileEventType GetEventType(WatcherChangeTypes changeType)
{
return changeType switch
{
WatcherChangeTypes.Created => FileEventType.Created,
WatcherChangeTypes.Deleted => FileEventType.Deleted,
WatcherChangeTypes.Changed => FileEventType.Modified,
WatcherChangeTypes.Renamed => FileEventType.Renamed,
_ => FileEventType.Other
};
}
}
/// <summary>
/// 文件事件类型
/// </summary>
public enum FileEventType
{
/// <summary>
/// 文件被创建
/// </summary>
Created,
/// <summary>
/// 文件被修改
/// </summary>
Modified,
/// <summary>
/// 文件被删除
/// </summary>
Deleted,
/// <summary>
/// 文件被重命名
/// </summary>
Renamed,
/// <summary>
/// 其他类型事件
/// </summary>
Other
}
/// <summary>
/// 表示一个事件日志文件
/// </summary>
public class EventLogFile
{
/// <summary>
/// 日志文件创建时间
/// </summary>
public DateTime CreatedTime { get; set; } = DateTime.UtcNow;
/// <summary>
/// 日志文件包含的事件列表
/// </summary>
public List<FileEvent> Events { get; set; } = new List<FileEvent>();
}
/// <summary>
/// 事件查询结果
/// </summary>
public class EventQueryResult
{
/// <summary>
/// 查询到的事件列表
/// </summary>
public List<FileEvent> Events { get; set; } = new List<FileEvent>();
/// <summary>
/// 匹配的事件总数
/// </summary>
public int TotalCount { get; set; }
/// <summary>
/// 查询是否有更多结果
/// </summary>
public bool HasMore { get; set; }
/// <summary>
/// 查询时间范围的开始时间
/// </summary>
public DateTime StartTime { get; set; }
/// <summary>
/// 查询时间范围的结束时间
/// </summary>
public DateTime EndTime { get; set; }
}
/// <summary>
/// 事件查询参数
/// </summary>
public class EventQueryParams
{
/// <summary>
/// 查询开始时间
/// </summary>
public DateTime? StartTime { get; set; }
/// <summary>
/// 查询结束时间
/// </summary>
public DateTime? EndTime { get; set; }
/// <summary>
/// 事件类型过滤
/// </summary>
public FileEventType? EventType { get; set; }
/// <summary>
/// 文件路径过滤(支持包含关系)
/// </summary>
public string PathFilter { get; set; }
/// <summary>
/// 文件扩展名过滤
/// </summary>
public string ExtensionFilter { get; set; }
/// <summary>
/// 分页大小
/// </summary>
public int PageSize { get; set; } = 100;
/// <summary>
/// 分页索引从0开始
/// </summary>
public int PageIndex { get; set; } = 0;
/// <summary>
/// 排序方向true为升序false为降序
/// </summary>
public bool AscendingOrder { get; set; } = false;
}
}

View File

@ -0,0 +1,157 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace JiShe.CollectBus.PluginFileWatcher
{
/// <summary>
/// 文件监控相关工具类
/// </summary>
public static class FileWatcherUtils
{
/// <summary>
/// 检测文件是否被锁定
/// </summary>
/// <param name="filePath">要检查的文件路径</param>
/// <returns>如果文件被锁定则返回true否则返回false</returns>
public static bool IsFileLocked(string filePath)
{
if (string.IsNullOrEmpty(filePath) || !File.Exists(filePath))
return false;
try
{
using (FileStream stream = File.Open(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None))
{
// 文件可以被完全访问,没有被锁定
stream.Close();
}
return false;
}
catch (IOException)
{
// 文件被锁定或正在被其他进程使用
return true;
}
catch (Exception)
{
// 其他错误(权限不足、路径无效等)
return true;
}
}
/// <summary>
/// 尝试处理一个可能被锁定的文件
/// </summary>
/// <param name="filePath">文件路径</param>
/// <param name="action">成功解锁后要执行的操作</param>
/// <param name="config">健壮性配置</param>
/// <returns>处理结果true表示成功处理false表示处理失败</returns>
public static async Task<bool> TryHandleLockedFileAsync(string filePath, Func<Task> action, RobustnessConfig config)
{
if (!config.EnableFileLockDetection)
{
// 如果禁用了锁检测,直接执行操作
try
{
await action();
return true;
}
catch
{
return false;
}
}
// 如果文件不存在或不是锁定状态,直接执行操作
if (!File.Exists(filePath) || !IsFileLocked(filePath))
{
try
{
await action();
return true;
}
catch
{
return false;
}
}
// 文件被锁定,根据策略处理
switch (config.LockedFileStrategy.ToLower())
{
case "skip":
// 跳过这个文件
Console.WriteLine($"文件被锁定,已跳过: {filePath}");
return false;
case "retry":
// 重试几次
for (int i = 0; i < config.FileLockRetryCount; i++)
{
await Task.Delay(config.FileLockRetryDelayMs);
if (!IsFileLocked(filePath))
{
try
{
await action();
Console.WriteLine($"文件锁已释放,成功处理: {filePath}");
return true;
}
catch
{
// 继续重试
}
}
}
Console.WriteLine($"文件仍然被锁定,重试{config.FileLockRetryCount}次后放弃: {filePath}");
return false;
case "log":
default:
// 只记录不处理
Console.WriteLine($"文件被锁定,已记录: {filePath}");
return false;
}
}
/// <summary>
/// 检查文件系统监控器是否健康
/// </summary>
/// <param name="watcher">要检查的监控器</param>
/// <param name="lastEventTime">最后一次事件的时间</param>
/// <param name="config">健壮性配置</param>
/// <returns>如果监控器健康则返回true否则返回false</returns>
public static bool IsWatcherHealthy(FileSystemWatcher watcher, DateTime lastEventTime, RobustnessConfig config)
{
if (watcher == null || !watcher.EnableRaisingEvents)
return false;
// 如果配置了超时时间,检查是否超时
if (config.WatcherTimeoutSeconds > 0)
{
// 如果最后事件时间超过了超时时间,认为监控器可能已经失效
TimeSpan timeSinceLastEvent = DateTime.UtcNow - lastEventTime;
if (timeSinceLastEvent.TotalSeconds > config.WatcherTimeoutSeconds)
{
// 执行一个简单的测试:尝试改变一些属性看是否抛出异常
try
{
var currentFilter = watcher.Filter;
watcher.Filter = currentFilter;
return true; // 如果没有异常,认为监控器仍然正常
}
catch
{
return false; // 抛出异常,认为监控器已经失效
}
}
}
// 默认情况下认为监控器健康
return true;
}
}
}

View File

@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<DockerfileContext>..\..</DockerfileContext>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.66" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="9.0.4" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.0" />
<!--<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />-->
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="Serilog" Version="4.2.0" />
<PackageReference Include="Serilog.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="Serilog.Settings.Configuration" Version="8.0.4" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageReference Include="Serilog.Sinks.File" Version="6.0.0" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,10 @@
{
"profiles": {
"JiShe.CollectBus.PluginFileWatcher": {
"commandName": "Project"
},
"Container (Dockerfile)": {
"commandName": "Docker"
}
}
}

View File

@ -0,0 +1,88 @@
{
"Serilog": {
"Using": [ "Serilog.Sinks.Console", "Serilog.Sinks.File" ],
"MinimumLevel": {
"Default": "Information",
"Override": {
"Microsoft": "Warning",
"System": "Warning"
}
},
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}"
}
},
{
"Name": "File",
"Args": {
"path": "Logs/filemonitor-.log",
"rollingInterval": "Day",
"outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{Level:u3}] {Message:lj}{NewLine}{Exception}",
"retainedFileCountLimit": 31
}
}
],
"Enrich": [ "FromLogContext" ]
},
"FileMonitorConfig": {
"General": {
"EnableFileFiltering": true,
"MemoryMonitorIntervalMinutes": 1,
"DefaultMonitorPath": "D:\\MonitorFiles"
},
"FileFilters": {
"AllowedExtensions": [ ".dll" ],
"ExcludedDirectories": [ "bin", "obj", "node_modules" ],
"IncludeSubdirectories": true
},
"Performance": {
"MemoryCleanupThreshold": 5000,
"ChannelCapacity": 1000,
"EventDebounceTimeSeconds": 3,
"MaxDictionarySize": 10000,
"CleanupIntervalSeconds": 5,
"ProcessingDelayMs": 5
},
"Robustness": {
"EnableAutoRecovery": true,
"WatcherHealthCheckIntervalSeconds": 30,
"WatcherTimeoutSeconds": 60,
"MaxRestartAttempts": 3,
"RestartDelaySeconds": 5,
"EnableFileLockDetection": true,
"LockedFileStrategy": "Retry",
"FileLockRetryCount": 3,
"FileLockRetryDelayMs": 500
},
"EventStorage": {
"EnableEventStorage": true,
"StorageType": "SQLite",
"StorageDirectory": "D:/EventLogs",
"DatabasePath": "D:/EventLogs/events.db",
"ConnectionString": "Data Source=D:/EventLogs/events.db;Foreign Keys=True",
"CommandTimeout": 30,
"LogFileNameFormat": "FileEvents_{0:yyyy-MM-dd}.json",
"StorageIntervalSeconds": 60,
"BatchSize": 100,
"MaxEventRecords": 100000,
"DataRetentionDays": 30,
"MaxLogFiles": 30,
"CompressLogFiles": true,
"EnableEventReplay": true,
"ReplayIntervalMs": 100,
"ReplaySpeedFactor": 1.0
},
"NotifyFilters": [
"LastWrite",
"FileName",
"DirectoryName",
"CreationTime"
],
"Logging": {
"LogLevel": "Information"
}
}
}

View File

@ -7,6 +7,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Text; using System.Text;
using Volo.Abp.Timing;
namespace JiShe.CollectBus.Kafka.Consumer namespace JiShe.CollectBus.Kafka.Consumer
{ {

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Enums; using FreeRedis;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
@ -12,6 +13,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.FreeRedis;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{ {
@ -20,6 +22,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
private readonly ILogger<BaseProtocolPlugin_bak> _logger; private readonly ILogger<BaseProtocolPlugin_bak> _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository; private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
private readonly IFreeRedisProvider _redisProvider;
//头部字节长度 //头部字节长度
public const int hearderLen = 6; public const int hearderLen = 6;
@ -38,13 +41,14 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin_bak>>(); _logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin_bak>>();
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>(); _protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_producerService = serviceProvider.GetRequiredService<IProducerService>(); _producerService = serviceProvider.GetRequiredService<IProducerService>();
_redisProvider = serviceProvider.GetRequiredService<IFreeRedisProvider>();
} }
public abstract ProtocolInfo Info { get; } public abstract ProtocolInfo Info { get; }
public virtual async Task<ProtocolInfo> GetAsync() => await Task.FromResult(Info); public virtual async Task<ProtocolInfo> GetAsync() => await Task.FromResult(Info);
public virtual async Task AddAsync() public virtual async Task LoadAsync()
{ {
if (Info == null) if (Info == null)
{ {
@ -53,7 +57,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name); await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
await _protocolInfoRepository.InsertAsync(Info); await _protocolInfoRepository.InsertAsync(Info);
//await _protocolInfoCache.Get() await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
} }
public abstract Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761; public abstract Task<T> AnalyzeAsync<T>(MessageReceived messageReceived, Action<byte[]>? sendAction = null) where T : TB3761;
@ -1168,6 +1173,5 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
} }
#endregion #endregion
} }
} }

View File

@ -0,0 +1,18 @@
using JiShe.CollectBus.FreeRedis.Options;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Protocol.Contracts
{
public class CollectBusProtocolModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
}
}
}

View File

@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{ {
Task<ProtocolInfo> GetAsync(); Task<ProtocolInfo> GetAsync();
Task AddAsync(); Task LoadAsync();
Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null) where T : class; Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null) where T : class;

View File

@ -0,0 +1,22 @@
using JiShe.CollectBus.IotSystems.Protocols;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp;
namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{
public interface IProtocolService
{
/// <summary>
/// 通过仪器设备型号获取协议信息
/// </summary>
/// <param name="deviceCode"></param>
/// <param name="isSpecial"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
Task<ProtocolInfo> FirstOrDefaultByDeviceAsync(string deviceCode, bool isSpecial = false);
}
}

View File

@ -19,6 +19,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />

View File

@ -0,0 +1,52 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IotSystems.Protocols;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using System.Text.RegularExpressions;
using Volo.Abp;
namespace JiShe.CollectBus.Protocol.Contracts.Services
{
public class ProtocolService : IProtocolService, ISingletonDependency
{
private readonly IFreeRedisProvider _freeRedisProvider;
public ProtocolService(IFreeRedisProvider freeRedisProvider)
{
_freeRedisProvider = freeRedisProvider;
}
/// <summary>
/// 通过仪器设备型号获取协议信息
/// </summary>
/// <param name="deviceCode"></param>
/// <param name="isSpecial"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
public async Task<ProtocolInfo> FirstOrDefaultByDeviceAsync(string deviceCode, bool isSpecial = false)
{
var protocols = await _freeRedisProvider.Instance.HGetAllAsync<ProtocolInfo>(RedisConst.ProtocolKey);
var keyValuePair = protocols.FirstOrDefault(a => ContainsExactPartRegex(deviceCode, a.Value.RegularExpression));
if (!keyValuePair.Key.IsNullOrWhiteSpace() || keyValuePair.Value != null) return keyValuePair.Value;
if (isSpecial) throw new UserFriendlyException("The device protocol plugin does not exist!", ExceptionCode.NotFound);
var hasStandardProtocolPlugin = protocols.TryGetValue("StandardProtocolPlugin", out var protocolInfo);
if (!hasStandardProtocolPlugin) throw new UserFriendlyException("Standard protocol plugin does not exist!", ExceptionCode.NotFound);
return protocolInfo;
}
private static bool ContainsExactPartRegex(string searchPattern, string fullString)
{
// 构建正则表达式 - 匹配以逗号或开头为边界,以逗号或结尾为边界的部分
var pattern = $"(^|,)\\s*{Regex.Escape(searchPattern)}\\s*(,|$)";
return Regex.IsMatch(fullString, pattern, RegexOptions.IgnoreCase);
}
}
}

View File

@ -17,7 +17,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" /> <ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>
<Target Name="PostBuild" AfterTargets="PostBuildEvent"> <Target Name="PostBuild" AfterTargets="PostBuildEvent">

View File

@ -5,17 +5,17 @@ using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Protocol.Test namespace JiShe.CollectBus.Protocol.Test
{ {
public class JiSheCollectBusProtocolModule : AbpModule public class JiSheCollectBusTestProtocolModule : AbpModule
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
context.Services.AddKeyedSingleton<IProtocolPlugin, TestProtocolPlugin>(nameof(TestProtocolPlugin)); context.Services.AddKeyedSingleton<IProtocolPlugin, TestProtocolPlugin>(nameof(TestProtocolPlugin));
} }
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{ {
var protocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(TestProtocolPlugin)); var protocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(TestProtocolPlugin));
protocol.AddAsync(); await protocol.LoadAsync();
} }
} }
} }

View File

@ -26,135 +26,5 @@ namespace JiShe.CollectBus.Protocol.Test
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
#region
//68
//32 00
//32 00
//68
//C9 1100'1001. 控制域C。
// D7=1, (终端发送)上行方向。
// D6=1, 此帧来自启动站。
// D5=0, (上行方向)要求访问位。表示终端无事件数据等待访问。
// D4=0, 保留
// D3~D0=9, 功能码。链路测试
//20 32 行政区划码
//90 26 终端地址
//00 主站地址和组地址标志。终端为单地址。 //3220 09 87 2
// 终端启动的发送帧的 MSA 应为 0, 其主站响应帧的 MSA 也应为 0.
//02 应用层功能码。AFN=2, 链路接口检测
//70 0111'0000. 帧序列域。无时间标签、单帧、需要确认。
//00 00 信息点。DA1和DA2全为“0”时表示终端信息点。
//01 00 信息类。F1, 登录。
//44 帧尾,包含用户区数据校验和
//16 帧结束标志
/// <summary>
/// 解析上行命令
/// </summary>
/// <param name="cmd"></param>
/// <returns></returns>
public CommandReulst? AnalysisCmd(string cmd)
{
CommandReulst? commandReulst = null;
var hexStringList = cmd.StringToPairs();
if (hexStringList.Count < hearderLen)
{
return commandReulst;
}
//验证起始字符
if (!hexStringList[0].IsStartStr() || !hexStringList[5].IsStartStr())
{
return commandReulst;
}
var lenHexStr = $"{hexStringList[2]}{hexStringList[1]}";
var lenBin = lenHexStr.HexToBin();
var len = lenBin.Remove(lenBin.Length - 2).BinToDec();
//验证长度
if (hexStringList.Count - 2 != hearderLen + len)
return commandReulst;
var userDataIndex = hearderLen;
var c = hexStringList[userDataIndex];//控制域 1字节
userDataIndex += 1;
var aHexList = hexStringList.Skip(userDataIndex).Take(5).ToList();//地址域 5字节
var a = AnalysisA(aHexList);
var a3Bin = aHexList[4].HexToBin().PadLeft(8, '0');
var mSA = a3Bin.Substring(0, 7).BinToDec();
userDataIndex += 5;
var aFN = (AFN)hexStringList[userDataIndex].HexToDec();//1字节
userDataIndex += 1;
var seq = hexStringList[userDataIndex].HexToBin().PadLeft(8, '0');
var tpV = (TpV)Convert.ToInt32(seq.Substring(0, 1));
var fIRFIN = (FIRFIN)Convert.ToInt32(seq.Substring(1, 2));
var cON = (CON)Convert.ToInt32(seq.Substring(3, 1));
var prseqBin = seq.Substring(4, 4);
userDataIndex += 1;
// (DA2 - 1) * 8 + DA1 = pn
var da1Bin = hexStringList[userDataIndex].HexToBin();
var da1 = da1Bin == "0" ? 0 : da1Bin.Length;
userDataIndex += 1;
var da2 = hexStringList[userDataIndex].HexToDec();
var pn = da2 == 0 ? 0 : (da2 - 1) * 8 + da1;
userDataIndex += 1;
//(DT2*8)+DT1=fn
var dt1Bin = hexStringList[userDataIndex].HexToBin();
var dt1 = dt1Bin != "0" ? dt1Bin.Length : 0;
userDataIndex += 1;
var dt2 = hexStringList[userDataIndex].HexToDec();
var fn = dt2 * 8 + dt1;
userDataIndex += 1;
//数据单元
var datas = hexStringList.Skip(userDataIndex).Take(len + hearderLen - userDataIndex).ToList();
//EC
//Tp
commandReulst = new CommandReulst()
{
A = a,
MSA = mSA,
AFN = aFN,
Seq = new Seq()
{
TpV = tpV,
FIRFIN = fIRFIN,
CON = cON,
PRSEQ = prseqBin.BinToDec(),
},
CmdLength = len,
Pn = pn,
Fn = fn,
HexDatas = datas
};
return commandReulst;
}
/// <summary>
/// 解析地址
/// </summary>
/// <param name="aHexList"></param>
/// <returns></returns>
private string AnalysisA(List<string> aHexList)
{
var a1 = aHexList[1] + aHexList[0];
var a2 = aHexList[3] + aHexList[2];
var a2Dec = a2.HexToDec();
var a3 = aHexList[4];
var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}";
return a;
}
#endregion
} }
} }

View File

@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Protocol
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{ {
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin)); var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
await standardProtocol.AddAsync(); await standardProtocol.LoadAsync();
} }
public void RegisterProtocolAnalysis(IServiceCollection services) public void RegisterProtocolAnalysis(IServiceCollection services)

View File

@ -11,6 +11,7 @@ using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.FreeRedis;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
using Volo.Abp.Timing;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -20,7 +21,6 @@ public abstract class CollectBusAppService : ApplicationService
public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService<IFreeSqlProvider>(); public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService<IFreeSqlProvider>();
protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService<IFreeRedisProvider>()!; protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService<IFreeRedisProvider>()!;
protected CollectBusAppService() protected CollectBusAppService()
{ {
LocalizationResource = typeof(CollectBusResource); LocalizationResource = typeof(CollectBusResource);

View File

@ -28,6 +28,7 @@ using Microsoft.Extensions.Options;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
using ZstdSharp.Unsafe;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -42,7 +43,8 @@ namespace JiShe.CollectBus;
typeof(CollectBusFreeSqlModule), typeof(CollectBusFreeSqlModule),
typeof(CollectBusKafkaModule), typeof(CollectBusKafkaModule),
typeof(CollectBusIoTDbModule), typeof(CollectBusIoTDbModule),
typeof(CollectBusCassandraModule) typeof(CollectBusCassandraModule),
typeof(CollectBusProtocolModule)
)] )]
public class CollectBusApplicationModule : AbpModule public class CollectBusApplicationModule : AbpModule
{ {
@ -63,14 +65,11 @@ public class CollectBusApplicationModule : AbpModule
context.Services.OnRegistered(ctx => context.Services.OnRegistered(ctx =>
{ {
var methods = ctx.ImplementationType.GetMethods(); var methods = ctx.ImplementationType.GetMethods();
foreach (var method in methods) var any = methods.Any(a=>a.GetCustomAttribute<LogInterceptAttribute>()!=null);
{ if (any)
var attr = method.GetCustomAttribute(typeof(LogInterceptAttribute), true);
if (attr != null)
{ {
ctx.Interceptors.TryAdd<LogInterceptor>(); ctx.Interceptors.TryAdd<LogInterceptor>();
} }
}
}); });
} }
@ -84,22 +83,14 @@ public class CollectBusApplicationModule : AbpModule
await context.AddBackgroundWorkerAsync(type); await context.AddBackgroundWorkerAsync(type);
} }
Task.Run(() =>
{
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
await dbContext.InitAmmeterCacheData(); dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData(); //await dbContext.InitWatermeterCacheData();
}).ConfigureAwait(false);
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
var kafkaOptions = context.ServiceProvider.GetRequiredService<IOptions<KafkaOptionConfig>>();
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
await kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor);
}
} }
} }

View File

@ -18,7 +18,7 @@ namespace JiShe.CollectBus.EnergySystem
{ {
public class CacheAppService : CollectBusAppService, ICacheAppService public class CacheAppService : CollectBusAppService, ICacheAppService
{ {
public async Task SetHashByKey(string key) public async Task SetHashByKey()
{ {
var data = await SqlProvider.Instance.Change(DbEnum.EnergyDB).Select<V_FocusAmmeter>().ToListAsync(); var data = await SqlProvider.Instance.Change(DbEnum.EnergyDB).Select<V_FocusAmmeter>().ToListAsync();

View File

@ -5,5 +5,6 @@ namespace JiShe.CollectBus.Interceptors
[AttributeUsage(AttributeTargets.Method)] [AttributeUsage(AttributeTargets.Method)]
public class LogInterceptAttribute : Attribute public class LogInterceptAttribute : Attribute
{ {
} }
} }

View File

@ -25,6 +25,7 @@ using System.Diagnostics;
using System.Linq; using System.Linq;
using Cassandra; using Cassandra;
using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Protocols;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
@ -34,15 +35,17 @@ public class TestAppService : CollectBusAppService
private readonly ILogger<TestAppService> _logger; private readonly ILogger<TestAppService> _logger;
private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository; private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
private readonly ICassandraProvider _cassandraProvider; private readonly ICassandraProvider _cassandraProvider;
private readonly IProtocolService _protocolService;
public TestAppService( public TestAppService(
ILogger<TestAppService> logger, ILogger<TestAppService> logger,
ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository, ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository,
ICassandraProvider cassandraProvider) ICassandraProvider cassandraProvider, IProtocolService protocolService)
{ {
_logger = logger; _logger = logger;
_messageReceivedCassandraRepository = messageReceivedCassandraRepository; _messageReceivedCassandraRepository = messageReceivedCassandraRepository;
_cassandraProvider = cassandraProvider; _cassandraProvider = cassandraProvider;
_protocolService = protocolService;
} }
public async Task AddMessageOfCassandra() public async Task AddMessageOfCassandra()
{ {
@ -123,9 +126,15 @@ public class TestAppService : CollectBusAppService
} }
[LogIntercept] [LogIntercept]
public async Task<string> LogInterceptorTest(string str) public virtual Task<string> LogInterceptorTest(string str)
{ {
_logger.LogWarning(str); _logger.LogWarning(str);
return str; return Task.FromResult(str) ;
}
public virtual async Task<ProtocolInfo> GetProtocol(string deviceCode, bool isSpecial = false)
{
var protocol = await _protocolService.FirstOrDefaultByDeviceAsync(deviceCode, isSpecial);
return protocol;
} }
} }

View File

@ -19,6 +19,7 @@ using JiShe.CollectBus.IoTDB.Interface;
using TouchSocket.Sockets; using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
using System.Collections.Generic; using System.Collections.Generic;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.Protocol.Dto; using JiShe.CollectBus.Protocol.Dto;
@ -62,8 +63,8 @@ namespace JiShe.CollectBus.Subscribers
_dbProvider = dbProvider; _dbProvider = dbProvider;
} }
[LogIntercept]
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages) public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false; bool isAck = false;
@ -90,7 +91,6 @@ namespace JiShe.CollectBus.Subscribers
} }
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages) public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false; bool isAck = false;
@ -120,7 +120,6 @@ namespace JiShe.CollectBus.Subscribers
} }
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
//[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage) public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
{ {
var currentTime = Clock.Now; var currentTime = Clock.Now;
@ -175,7 +174,6 @@ namespace JiShe.CollectBus.Subscribers
} }
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages) public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
{ {
//foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) //foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
@ -197,7 +195,6 @@ namespace JiShe.CollectBus.Subscribers
} }
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)] [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)]
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages) public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
{ {
//foreach (var receivedLoginMessage in receivedLoginMessages) //foreach (var receivedLoginMessage in receivedLoginMessages)

View File

@ -0,0 +1,43 @@
using System;
namespace JiShe.CollectBus.IotSystems.CommunicationLogs
{
public class PacketLog : ICassandraEntity<Guid>
{
/// <summary>
/// 下行报文
/// </summary>
public string IssuedMessage { get; set; } = string.Empty;
/// <summary>
/// 上行报文
/// </summary>
public string ReportMessage { get; set; } = string.Empty;
public DateTime? IssuedTime { get; set; }
public DateTime? ReportTime { get; set; }
/// <summary>
/// 报文类型(是否需要回复)
/// </summary>
public PacketType PacketType { get; set; }
public Guid Id { get; set; }
}
public enum PacketType
{
/// <summary>
/// 只有下发,不需要回复
/// </summary>
OnlyIssued,
/// <summary>
/// 只有上报,不需要下发
/// </summary>
OnlyReport,
/// <summary>
/// 下发并且需要回复
/// </summary>
IssuedAndReport
}
}

View File

@ -16,6 +16,7 @@ namespace JiShe.CollectBus.IotSystems.Protocols
/// <param name="regularExpression"></param> /// <param name="regularExpression"></param>
public ProtocolInfo(string name, string baseProtocol, string type, string description, string regularExpression) public ProtocolInfo(string name, string baseProtocol, string type, string description, string regularExpression)
{ {
Code = $"PL-{DateTime.Now:yyyyMMddHHmmss}";
Name = name; Name = name;
Type = type; Type = type;
Description = description; Description = description;
@ -23,6 +24,11 @@ namespace JiShe.CollectBus.IotSystems.Protocols
BaseProtocol = baseProtocol; BaseProtocol = baseProtocol;
} }
/// <summary>
/// 协议编码,唯一识别
/// </summary>
public string Code { get; set; }
/// <summary> /// <summary>
/// 协议名称 /// 协议名称
/// </summary> /// </summary>

View File

@ -9,9 +9,9 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Compile Remove="PrepaymentSystems\新文件夹\**" /> <Compile Remove="PrepaymentSystems\**" />
<EmbeddedResource Remove="PrepaymentSystems\新文件夹\**" /> <EmbeddedResource Remove="PrepaymentSystems\**" />
<None Remove="PrepaymentSystems\新文件夹\**" /> <None Remove="PrepaymentSystems\**" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
@ -34,9 +34,4 @@
<PackageReference Include="Volo.Abp.AuditLogging.Domain" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AuditLogging.Domain" Version="8.3.3" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Folder Include="PrepaymentSystems\Entities\" />
<Folder Include="PrepaymentSystems\TableViews\" />
</ItemGroup>
</Project> </Project>

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Consts
{
public class ExceptionCode
{
public const string NotFound = "500404";
}
}

View File

@ -72,6 +72,11 @@ namespace JiShe.CollectBus.Common.Consts
///// </summary> ///// </summary>
//public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap"; //public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey"; public const string CacheAmmeterFocusKey = $"{CacheBasicDirectoryKey}CacheAmmeterFocusKey";
/// <summary>
/// 协议池缓存标识
/// </summary>
public const string ProtocolKey = $"{CacheBasicDirectoryKey}Protocols";
} }
} }

View File

@ -49,7 +49,7 @@ namespace JiShe.CollectBus.Common.Extensions
/// <br /> /// <br />
/// </returns> /// </returns>
[Description("正则匹配单个")] [Description("正则匹配单个")]
public static string RegexMatch(this string str, string pattern) public static string? RegexMatch(this string str, string pattern)
{ {
var reg = new Regex(pattern); var reg = new Regex(pattern);
var match = reg.Match(str); var match = reg.Match(str);

View File

@ -13,6 +13,7 @@ using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.Caching.StackExchangeRedis; using Volo.Abp.Caching.StackExchangeRedis;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Volo.Abp.Swashbuckle; using Volo.Abp.Swashbuckle;
using Volo.Abp.Timing;
namespace JiShe.CollectBus.Host namespace JiShe.CollectBus.Host
{ {
@ -24,6 +25,7 @@ namespace JiShe.CollectBus.Host
typeof(AbpAspNetCoreAuthenticationJwtBearerModule), typeof(AbpAspNetCoreAuthenticationJwtBearerModule),
typeof(AbpAspNetCoreSerilogModule), typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule), typeof(AbpSwashbuckleModule),
typeof(AbpTimingModule),
typeof(CollectBusApplicationModule), typeof(CollectBusApplicationModule),
typeof(CollectBusMongoDbModule), typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule), typeof(AbpCachingStackExchangeRedisModule),
@ -46,6 +48,7 @@ namespace JiShe.CollectBus.Host
//ConfigureKafkaTopic(context, configuration); //ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context); ConfigureAuditLog(context);
ConfigureCustom(context, configuration); ConfigureCustom(context, configuration);
Configure<AbpClockOptions>(options => { options.Kind = DateTimeKind.Local; });
} }