using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.PluginFileWatcher
{
///
/// SQLite数据库管理器,用于管理文件事件的存储和检索
///
public class EventDatabaseManager : IDisposable
{
private readonly FileMonitorConfig _config;
private readonly ILogger _logger;
private readonly string _connectionString;
private readonly string _databasePath;
private readonly int _commandTimeout;
private bool _disposed;
///
/// 初始化数据库管理器
///
/// 配置对象
/// 日志记录器
public EventDatabaseManager(FileMonitorConfig config, ILogger logger)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
// 确保使用配置中的设置
_databasePath = config.EventStorage.DatabasePath;
_connectionString = config.EventStorage.ConnectionString;
_commandTimeout = config.EventStorage.CommandTimeout;
// 确保数据库目录存在
string dbDirectory = Path.GetDirectoryName(_databasePath);
if (!string.IsNullOrEmpty(dbDirectory) && !Directory.Exists(dbDirectory))
{
Directory.CreateDirectory(dbDirectory);
}
// 初始化数据库
InitializeDatabase().GetAwaiter().GetResult();
}
///
/// 初始化数据库,确保必要的表已创建
///
private async Task InitializeDatabase()
{
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 创建文件事件表
string createTableSql = @"
CREATE TABLE IF NOT EXISTS FileEvents (
Id TEXT PRIMARY KEY,
Timestamp TEXT NOT NULL,
EventType INTEGER NOT NULL,
FullPath TEXT NOT NULL,
FileName TEXT NOT NULL,
Directory TEXT NOT NULL,
Extension TEXT NOT NULL,
OldFileName TEXT,
OldFullPath TEXT,
FileSize INTEGER,
CreatedAt TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON FileEvents(Timestamp);
CREATE INDEX IF NOT EXISTS idx_events_eventtype ON FileEvents(EventType);
CREATE INDEX IF NOT EXISTS idx_events_extension ON FileEvents(Extension);";
await connection.ExecuteAsync(createTableSql, commandTimeout: _commandTimeout);
// 创建元数据表
string createMetadataTableSql = @"
CREATE TABLE IF NOT EXISTS EventMetadata (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
EventId TEXT NOT NULL,
MetadataKey TEXT NOT NULL,
MetadataValue TEXT,
FOREIGN KEY (EventId) REFERENCES FileEvents(Id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_metadata_eventid ON EventMetadata(EventId);
CREATE INDEX IF NOT EXISTS idx_metadata_key ON EventMetadata(MetadataKey);";
await connection.ExecuteAsync(createMetadataTableSql, commandTimeout: _commandTimeout);
_logger.LogInformation("数据库初始化成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "初始化数据库失败");
throw;
}
}
///
/// 保存文件事件到数据库
///
/// 要保存的事件列表
public async Task SaveEventsAsync(List events)
{
if (events == null || events.Count == 0)
return;
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 开始事务
using var transaction = connection.BeginTransaction();
try
{
foreach (var fileEvent in events)
{
// 插入事件数据
string insertEventSql = @"
INSERT INTO FileEvents (
Id, Timestamp, EventType, FullPath, FileName,
Directory, Extension, OldFileName, OldFullPath,
FileSize, CreatedAt
) VALUES (
@Id, @Timestamp, @EventType, @FullPath, @FileName,
@Directory, @Extension, @OldFileName, @OldFullPath,
@FileSize, @CreatedAt
)";
await connection.ExecuteAsync(insertEventSql, new
{
Id = fileEvent.Id.ToString(), // 确保ID始终以字符串形式保存
Timestamp = fileEvent.Timestamp.ToString("o"),
EventType = (int)fileEvent.EventType,
fileEvent.FullPath,
fileEvent.FileName,
fileEvent.Directory,
fileEvent.Extension,
fileEvent.OldFileName,
fileEvent.OldFullPath,
fileEvent.FileSize,
CreatedAt = DateTime.UtcNow.ToString("o")
}, transaction, _commandTimeout);
// 插入元数据
if (fileEvent.Metadata != null && fileEvent.Metadata.Count > 0)
{
string insertMetadataSql = @"
INSERT INTO EventMetadata (EventId, MetadataKey, MetadataValue)
VALUES (@EventId, @MetadataKey, @MetadataValue)";
foreach (var metadata in fileEvent.Metadata)
{
await connection.ExecuteAsync(insertMetadataSql, new
{
EventId = fileEvent.Id.ToString(), // 确保ID以相同格式保存
MetadataKey = metadata.Key,
MetadataValue = metadata.Value
}, transaction, _commandTimeout);
}
}
}
// 提交事务
transaction.Commit();
_logger.LogInformation($"已成功保存 {events.Count} 个事件到数据库");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
_logger.LogError(ex, "保存事件到数据库时发生错误");
throw;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "连接数据库失败");
throw;
}
}
///
/// 查询事件
///
/// 查询参数
/// 查询结果
public async Task QueryEventsAsync(EventQueryParams queryParams)
{
if (queryParams == null)
throw new ArgumentNullException(nameof(queryParams));
var result = new EventQueryResult
{
StartTime = queryParams.StartTime ?? DateTime.MinValue,
EndTime = queryParams.EndTime ?? DateTime.MaxValue
};
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 构建查询条件
var conditions = new List();
var parameters = new DynamicParameters();
if (queryParams.StartTime.HasValue)
{
conditions.Add("Timestamp >= @StartTime");
parameters.Add("@StartTime", queryParams.StartTime.Value.ToString("o"));
}
if (queryParams.EndTime.HasValue)
{
conditions.Add("Timestamp <= @EndTime");
parameters.Add("@EndTime", queryParams.EndTime.Value.ToString("o"));
}
if (queryParams.EventType.HasValue)
{
conditions.Add("EventType = @EventType");
parameters.Add("@EventType", (int)queryParams.EventType.Value);
}
if (!string.IsNullOrEmpty(queryParams.PathFilter))
{
conditions.Add("FullPath LIKE @PathFilter");
parameters.Add("@PathFilter", $"%{queryParams.PathFilter}%");
}
if (!string.IsNullOrEmpty(queryParams.ExtensionFilter))
{
conditions.Add("Extension = @ExtensionFilter");
parameters.Add("@ExtensionFilter", queryParams.ExtensionFilter);
}
// 构建WHERE子句
string whereClause = conditions.Count > 0
? $"WHERE {string.Join(" AND ", conditions)}"
: string.Empty;
// 构建ORDER BY子句
string orderByClause = queryParams.AscendingOrder
? "ORDER BY Timestamp ASC"
: "ORDER BY Timestamp DESC";
// 获取总记录数
string countSql = $"SELECT COUNT(*) FROM FileEvents {whereClause}";
result.TotalCount = await connection.ExecuteScalarAsync(countSql, parameters, commandTimeout: _commandTimeout);
// 应用分页
string paginationClause = $"LIMIT @PageSize OFFSET @Offset";
parameters.Add("@PageSize", queryParams.PageSize);
parameters.Add("@Offset", queryParams.PageIndex * queryParams.PageSize);
// 查询事件数据
string querySql = $@"
SELECT Id,
Timestamp,
EventType,
FullPath,
FileName,
Directory,
Extension,
OldFileName,
OldFullPath,
FileSize
FROM FileEvents
{whereClause}
{orderByClause}
{paginationClause}";
var events = await connection.QueryAsync(querySql, parameters, commandTimeout: _commandTimeout);
// 处理查询结果
foreach (var eventData in events)
{
var fileEvent = new FileEvent
{
Id = Guid.Parse(eventData.Id),
Timestamp = DateTime.Parse(eventData.Timestamp),
EventType = (FileEventType)eventData.EventType,
FullPath = eventData.FullPath,
FileName = eventData.FileName,
Directory = eventData.Directory,
Extension = eventData.Extension,
OldFileName = eventData.OldFileName,
OldFullPath = eventData.OldFullPath,
FileSize = eventData.FileSize
};
// 获取元数据
string metadataSql = "SELECT MetadataKey, MetadataValue FROM EventMetadata WHERE EventId = @EventId";
var metadata = await connection.QueryAsync(metadataSql, new { EventId = fileEvent.Id.ToString() }, commandTimeout: _commandTimeout);
foreach (var item in metadata)
{
fileEvent.Metadata[item.MetadataKey] = item.MetadataValue;
}
result.Events.Add(fileEvent);
}
result.HasMore = (queryParams.PageIndex + 1) * queryParams.PageSize < result.TotalCount;
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "查询事件时发生错误");
throw;
}
}
///
/// 清理旧数据
///
/// 数据保留天数
public async Task CleanupOldDataAsync(int retentionDays)
{
if (retentionDays <= 0)
return;
try
{
DateTime cutoffDate = DateTime.UtcNow.AddDays(-retentionDays);
string cutoffDateStr = cutoffDate.ToString("o");
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
// 删除旧事件(级联删除元数据)
string deleteSql = "DELETE FROM FileEvents WHERE Timestamp < @CutoffDate";
int deletedCount = await connection.ExecuteAsync(deleteSql, new { CutoffDate = cutoffDateStr }, commandTimeout: _commandTimeout);
_logger.LogInformation($"已清理 {deletedCount} 条旧事件数据({retentionDays}天前)");
}
catch (Exception ex)
{
_logger.LogError(ex, "清理旧数据时发生错误");
throw;
}
}
///
/// 获取数据库统计信息
///
/// 数据库统计信息
public async Task GetDatabaseStatsAsync()
{
try
{
using var connection = new SqliteConnection(_connectionString);
await connection.OpenAsync();
// 启用外键约束
using (var command = connection.CreateCommand())
{
command.CommandText = "PRAGMA foreign_keys = ON;";
await command.ExecuteNonQueryAsync();
}
var stats = new DatabaseStats();
// 获取事件总数
stats.TotalEvents = await connection.ExecuteScalarAsync("SELECT COUNT(*) FROM FileEvents", commandTimeout: _commandTimeout);
// 获取最早和最新事件时间
stats.OldestEventTime = await connection.ExecuteScalarAsync("SELECT Timestamp FROM FileEvents ORDER BY Timestamp ASC LIMIT 1", commandTimeout: _commandTimeout);
stats.NewestEventTime = await connection.ExecuteScalarAsync("SELECT Timestamp FROM FileEvents ORDER BY Timestamp DESC LIMIT 1", commandTimeout: _commandTimeout);
// 获取事件类型分布
var eventTypeCounts = await connection.QueryAsync("SELECT EventType, COUNT(*) AS Count FROM FileEvents GROUP BY EventType", commandTimeout: _commandTimeout);
stats.EventTypeCounts = new Dictionary();
foreach (var item in eventTypeCounts)
{
stats.EventTypeCounts[(FileEventType)item.EventType] = item.Count;
}
// 获取扩展名分布(前10个)
var extensionCounts = await connection.QueryAsync(
"SELECT Extension, COUNT(*) AS Count FROM FileEvents GROUP BY Extension ORDER BY Count DESC LIMIT 10",
commandTimeout: _commandTimeout);
stats.TopExtensions = new Dictionary();
foreach (var item in extensionCounts)
{
stats.TopExtensions[item.Extension] = item.Count;
}
return stats;
}
catch (Exception ex)
{
_logger.LogError(ex, "获取数据库统计信息时发生错误");
throw;
}
}
///
/// 释放资源
///
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
}
}
///
/// 数据库统计信息
///
public class DatabaseStats
{
///
/// 事件总数
///
public int TotalEvents { get; set; }
///
/// 最早事件时间
///
public DateTime? OldestEventTime { get; set; }
///
/// 最新事件时间
///
public DateTime? NewestEventTime { get; set; }
///
/// 事件类型计数
///
public Dictionary EventTypeCounts { get; set; }
///
/// 排名前列的文件扩展名
///
public Dictionary TopExtensions { get; set; }
}
}