2025-04-21 09:54:34 +08:00

133 lines
4.5 KiB
C#

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using Confluent.Kafka;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IotSystems.PrepayModel;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.AFNEntity;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Volo.Abp.Application.Services;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using Volo.Abp.Domain.Repositories;
using System.Diagnostics;
using System.Linq;
using Cassandra;
using JiShe.CollectBus.Interceptors;
namespace JiShe.CollectBus.Samples;
[AllowAnonymous]
public class TestAppService : CollectBusAppService
{
private readonly ILogger<TestAppService> _logger;
private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
private readonly ICassandraProvider _cassandraProvider;
public TestAppService(
ILogger<TestAppService> logger,
ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository,
ICassandraProvider cassandraProvider)
{
_logger = logger;
_messageReceivedCassandraRepository = messageReceivedCassandraRepository;
_cassandraProvider = cassandraProvider;
}
public async Task AddMessageOfCassandra()
{
var stopwatch = Stopwatch.StartNew();
for (int i = 1; i <= 10000; i++)
{
var str = Guid.NewGuid().ToString();
await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued
{
ClientId = str,
DeviceNo = i.ToString(),
MessageId = str,
Type = IssuedEventType.Data,
Id = str,
Message = str.GetBytes()
});
}
stopwatch.Stop();
_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
}
public async Task AddMessageOfBulkInsertCassandra()
{
var records = new List<MessageIssued>();
var prepared = await _cassandraProvider.Session.PrepareAsync(
$"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)");
for (int i = 1; i <= 100000; i++)
{
var str = Guid.NewGuid().ToString();
records.Add(new MessageIssued
{
ClientId = str,
DeviceNo = i.ToString(),
MessageId = str,
Type = IssuedEventType.Data,
Id = str,
Message = str.GetBytes()
});
}
var stopwatch = Stopwatch.StartNew();
await BulkInsertAsync(_cassandraProvider.Session, prepared, records);
stopwatch.Stop();
_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
}
private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List<MessageIssued> records)
{
var tasks = new List<Task>();
var batch = new BatchStatement();
for (int i = 0; i < records.Count; i++)
{
var record = records[i];
var boundStatement = prepared.Bind(
record.Id,
record.ClientId,
record.Message,
record.DeviceNo,
(int)record.Type,
record.MessageId);
// 设置一致性级别为ONE以提高性能
boundStatement.SetConsistencyLevel(ConsistencyLevel.One);
batch.Add(boundStatement);
// 当达到批处理大小时执行
if (batch.Statements.Count() >= 1000 || i == records.Count - 1)
{
tasks.Add(session.ExecuteAsync(batch));
batch = new BatchStatement();
}
}
// 等待所有批处理完成
await Task.WhenAll(tasks);
}
[LogIntercept]
public async Task<string> LogInterceptorTest(string str)
{
_logger.LogWarning(str);
return str;
}
}