using System; using System.Collections.Generic; using System.Threading.Tasks; using Apache.IoTDB.DataStructure; using Apache.IoTDB; using Confluent.Kafka; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.IotSystems.PrepayModel; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.IotSystems.MessageIssueds; using Volo.Abp.Application.Services; using JiShe.CollectBus.IotSystems.MessageReceiveds; using Volo.Abp.Domain.Repositories; using System.Diagnostics; using System.Linq; using System.Reflection; using Cassandra; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IotSystems.Protocols; using TouchSocket.Core; using Volo.Abp.Modularity; using JiShe.CollectBus.DynamicModule; namespace JiShe.CollectBus.Samples; [AllowAnonymous] public class TestAppService : CollectBusAppService { private readonly ILogger _logger; private readonly ICassandraRepository _messageReceivedCassandraRepository; private readonly ICassandraProvider _cassandraProvider; private readonly IProtocolService _protocolService; private readonly IServiceProvider _serviceProvider; private readonly IDynamicModuleManager _dynamicModuleManager; public TestAppService( ILogger logger, ICassandraRepository messageReceivedCassandraRepository, ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager) { _logger = logger; _messageReceivedCassandraRepository = messageReceivedCassandraRepository; _cassandraProvider = cassandraProvider; _protocolService = protocolService; _serviceProvider = serviceProvider; _dynamicModuleManager = dynamicModuleManager; } public async Task AddMessageOfCassandra() { var stopwatch = Stopwatch.StartNew(); for (int i = 1; i <= 10000; i++) { var str = Guid.NewGuid().ToString(); await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued { ClientId = str, DeviceNo = i.ToString(), MessageId = str, Type = IssuedEventType.Data, Id = str, Message = str.GetBytes() }); } stopwatch.Stop(); _logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } public async Task AddMessageOfBulkInsertCassandra() { var records = new List(); var prepared = await _cassandraProvider.Session.PrepareAsync( $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)"); for (int i = 1; i <= 100000; i++) { var str = Guid.NewGuid().ToString(); records.Add(new MessageIssued { ClientId = str, DeviceNo = i.ToString(), MessageId = str, Type = IssuedEventType.Data, Id = str, Message = str.GetBytes() }); } var stopwatch = Stopwatch.StartNew(); await BulkInsertAsync(_cassandraProvider.Session, prepared, records); stopwatch.Stop(); _logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List records) { var tasks = new List(); var batch = new BatchStatement(); for (int i = 0; i < records.Count; i++) { var record = records[i]; var boundStatement = prepared.Bind( record.Id, record.ClientId, record.Message, record.DeviceNo, (int)record.Type, record.MessageId); // 设置一致性级别为ONE以提高性能 boundStatement.SetConsistencyLevel(ConsistencyLevel.One); batch.Add(boundStatement); // 当达到批处理大小时执行 if (batch.Statements.Count() >= 1000 || i == records.Count - 1) { tasks.Add(session.ExecuteAsync(batch)); batch = new BatchStatement(); } } // 等待所有批处理完成 await Task.WhenAll(tasks); } [LogIntercept] public virtual Task LogInterceptorTest(string str) { _logger.LogWarning(str); return Task.FromResult(str) ; } public virtual async Task GetProtocol(string deviceCode, bool isSpecial = false) { var protocol = await _protocolService.FirstOrDefaultByDeviceAsync(deviceCode, isSpecial); return protocol; } // 重新加载插件方法 public async Task ReloadPluginsAsync() { var aa = Assembly.LoadFile( @"D:\Codes\CollectBusV5\JiShe.CollectBus\web\JiShe.CollectBus.Host\bin\Debug\net8.0\Plugins\JiShe.CollectBus.Protocol.Test.dll"); var module = aa.GetTypes().First(a=> typeof(IAbpModule).IsAssignableFrom(a)); await _dynamicModuleManager.ReinitializeModuleAsync(module); } }