using IdentityModel.Client; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.DynamicModule; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Protocol; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol3761; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Reflection; using System.Threading.Tasks; using TouchSocket.Sockets; using Volo.Abp; using Volo.Abp.Modularity; namespace JiShe.CollectBus.Samples; [AllowAnonymous] public class TestAppService : CollectBusAppService { private readonly ILogger _logger; //private readonly ICassandraRepository _messageReceivedCassandraRepository; //private readonly ICassandraProvider _cassandraProvider; private readonly IProtocolService _protocolService; private readonly IServiceProvider _serviceProvider; private readonly IDynamicModuleManager _dynamicModuleManager; private readonly IServiceCollection _serviceDescriptors; public TestAppService( ILogger logger, //ICassandraRepository messageReceivedCassandraRepository, //ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager, IServiceCollection serviceDescriptors) { _logger = logger; //_messageReceivedCassandraRepository = messageReceivedCassandraRepository; //_cassandraProvider = cassandraProvider; _protocolService = protocolService; _serviceProvider = serviceProvider; _dynamicModuleManager = dynamicModuleManager; _serviceDescriptors = serviceDescriptors; } public async Task AddMessageOfCassandra() { //var stopwatch = Stopwatch.StartNew(); //for (int i = 1; i <= 10000; i++) //{ // var str = Guid.NewGuid().ToString(); // await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued // { // ClientId = str, // DeviceNo = i.ToString(), // MessageId = str, // Type = IssuedEventType.Data, // Id = str, // Message = str.GetBytes() // }); //} //stopwatch.Stop(); //_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } public async Task AddMessageOfBulkInsertCassandra() { //var records = new List(); //var prepared = await _cassandraProvider.Session.PrepareAsync( // $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)"); //for (int i = 1; i <= 100000; i++) //{ // var str = Guid.NewGuid().ToString(); // records.Add(new MessageIssued // { // ClientId = str, // DeviceNo = i.ToString(), // MessageId = str, // Type = IssuedEventType.Data, // Id = str, // Message = str.GetBytes() // }); //} //var stopwatch = Stopwatch.StartNew(); //await BulkInsertAsync(_cassandraProvider.Session, prepared, records); //stopwatch.Stop(); //_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } //private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List records) //{ // var tasks = new List(); // var batch = new BatchStatement(); // for (int i = 0; i < records.Count; i++) // { // var record = records[i]; // var boundStatement = prepared.Bind( // record.Id, // record.ClientId, // record.Message, // record.DeviceNo, // (int)record.Type, // record.MessageId); // // 设置一致性级别为ONE以提高性能 // boundStatement.SetConsistencyLevel(ConsistencyLevel.One); // batch.Add(boundStatement); // // 当达到批处理大小时执行 // if (batch.Statements.Count() >= 1000 || i == records.Count - 1) // { // tasks.Add(session.ExecuteAsync(batch)); // batch = new BatchStatement(); // } // } // // 等待所有批处理完成 // await Task.WhenAll(tasks); //} [LogIntercept] public virtual Task LogInterceptorTest(string str) { _logger.LogWarning(str); return Task.FromResult(str) ; } public virtual async Task GetProtocol(string deviceCode, bool isSpecial = false) { var protocol = await _protocolService.FirstOrDefaultByDeviceAsync(deviceCode, isSpecial); return protocol; } // 重新加载插件方法 public async Task ReloadPluginsAsync() { var aa = Assembly.LoadFile( @"E:\Devlopment\JiShe.CollectBusDev01\web\JiShe.CollectBus.Host\bin\Debug\net8.0\Plugins\JiShe.CollectBus.Protocol.Test.dll"); var module = aa.GetTypes().First(a=> typeof(IAbpModule).IsAssignableFrom(a)); await _dynamicModuleManager.LoadModuleAsync(module); } public async Task TestPlugins() { var executor = _serviceProvider.GetRequiredService(); return await executor.ExecuteAsync("TestServer", "TestServer"); } /// /// 注册策略 /// public void RegisterProtocolAnalysis() { // 扫描并注册所有策略 var strategyMetadata = new Dictionary<(string, Type), Type>(); _serviceDescriptors.AddTransient(); // 批量注册 var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); if (string.IsNullOrWhiteSpace(assemblyPath)) { return; } var dllFiles = Directory.GetFiles(Path.Combine(assemblyPath, "Plugins"), "*.dll"); foreach (var file in dllFiles) { // 跳过已加载的程序集 var assemblyName = AssemblyName.GetAssemblyName(file); var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); var assembly = existingAssembly ?? Assembly.LoadFrom(file); // 实现IAnalysisStrategy接口 var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<>))); if (!analysisStrategyTypes.Any()) continue; foreach (var analysisStrategyType in analysisStrategyTypes) { // 通过反射获取静态元数据 var strategyType = analysisStrategyType.Name; var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`1")!.GetGenericArguments(); var inputType = genericArgs[0]; // 注册策略实现 _serviceDescriptors.AddTransient(analysisStrategyType); strategyMetadata[(strategyType, inputType)] = analysisStrategyType; } } // 注册元数据字典 _serviceDescriptors.AddSingleton(strategyMetadata); // 注册策略解析工厂 _serviceDescriptors.AddTransient>(provider => (name, inputType) => { var metadata = provider.GetRequiredService>(); if (metadata.TryGetValue((name, inputType), out var strategyType)) { return provider.GetRequiredService(strategyType); } else { var logger = provider.GetRequiredService>(); logger.LogWarning($"未能找到解析策略:{name}-{inputType}"); return null; } }); } }