using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.DynamicModule; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Protocol.Interfaces; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reflection; using System.Threading.Tasks; using Volo.Abp.Modularity; namespace JiShe.CollectBus.Samples; [AllowAnonymous] public class TestAppService : CollectBusAppService { private readonly ILogger _logger; //private readonly ICassandraRepository _messageReceivedCassandraRepository; //private readonly ICassandraProvider _cassandraProvider; private readonly IProtocolService _protocolService; private readonly IServiceProvider _serviceProvider; private readonly IDynamicModuleManager _dynamicModuleManager; public TestAppService( ILogger logger, //ICassandraRepository messageReceivedCassandraRepository, //ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager) { _logger = logger; //_messageReceivedCassandraRepository = messageReceivedCassandraRepository; //_cassandraProvider = cassandraProvider; _protocolService = protocolService; _serviceProvider = serviceProvider; _dynamicModuleManager = dynamicModuleManager; } public async Task AddMessageOfCassandra() { //var stopwatch = Stopwatch.StartNew(); //for (int i = 1; i <= 10000; i++) //{ // var str = Guid.NewGuid().ToString(); // await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued // { // ClientId = str, // DeviceNo = i.ToString(), // MessageId = str, // Type = IssuedEventType.Data, // Id = str, // Message = str.GetBytes() // }); //} //stopwatch.Stop(); //_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } public async Task AddMessageOfBulkInsertCassandra() { //var records = new List(); //var prepared = await _cassandraProvider.Session.PrepareAsync( // $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)"); //for (int i = 1; i <= 100000; i++) //{ // var str = Guid.NewGuid().ToString(); // records.Add(new MessageIssued // { // ClientId = str, // DeviceNo = i.ToString(), // MessageId = str, // Type = IssuedEventType.Data, // Id = str, // Message = str.GetBytes() // }); //} //var stopwatch = Stopwatch.StartNew(); //await BulkInsertAsync(_cassandraProvider.Session, prepared, records); //stopwatch.Stop(); //_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } //private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List records) //{ // var tasks = new List(); // var batch = new BatchStatement(); // for (int i = 0; i < records.Count; i++) // { // var record = records[i]; // var boundStatement = prepared.Bind( // record.Id, // record.ClientId, // record.Message, // record.DeviceNo, // (int)record.Type, // record.MessageId); // // 设置一致性级别为ONE以提高性能 // boundStatement.SetConsistencyLevel(ConsistencyLevel.One); // batch.Add(boundStatement); // // 当达到批处理大小时执行 // if (batch.Statements.Count() >= 1000 || i == records.Count - 1) // { // tasks.Add(session.ExecuteAsync(batch)); // batch = new BatchStatement(); // } // } // // 等待所有批处理完成 // await Task.WhenAll(tasks); //} [LogIntercept] public virtual Task LogInterceptorTest(string str) { _logger.LogWarning(str); return Task.FromResult(str) ; } public virtual async Task GetProtocol(string deviceCode, bool isSpecial = false) { var protocol = await _protocolService.FirstOrDefaultByDeviceAsync(deviceCode, isSpecial); return protocol; } // 重新加载插件方法 public async Task ReloadPluginsAsync() { var aa = Assembly.LoadFile( @"D:\Codes\CollectBusV5\JiShe.CollectBus\web\JiShe.CollectBus.Host\bin\Debug\net8.0\Plugins\JiShe.CollectBus.Protocol.Test.dll"); var module = aa.GetTypes().First(a=> typeof(IAbpModule).IsAssignableFrom(a)); await _dynamicModuleManager.ReinitializeModuleAsync(module); } }