This commit is contained in:
zenghongyao 2025-04-17 19:41:43 +08:00
commit 444b01095f
13 changed files with 148 additions and 44 deletions

View File

@ -64,7 +64,7 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
await dbContext.InitAmmeterCacheData();
//await dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData();
//初始化主题信息

View File

@ -19,33 +19,109 @@ using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Volo.Abp.Application.Services;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using Volo.Abp.Domain.Repositories;
using System.Diagnostics;
using System.Linq;
using Cassandra;
namespace JiShe.CollectBus.Samples;
[AllowAnonymous]
public class TestAppService : CollectBusAppService, IApplicationService
public class TestAppService : CollectBusAppService
{
private readonly ILogger<TestAppService> _logger;
private readonly ICassandraRepository<MessageIssued,string> _messageIssuedCassandraRepository;
private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
private readonly ICassandraProvider _cassandraProvider;
public TestAppService(
ILogger<TestAppService> logger,
ICassandraRepository<MessageIssued, string> messageIssuedCassandraRepository
)
ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository, ICassandraProvider cassandraProvider)
{
_logger = logger;
_messageIssuedCassandraRepository = messageIssuedCassandraRepository;
_messageReceivedCassandraRepository = messageReceivedCassandraRepository;
_cassandraProvider = cassandraProvider;
}
public async Task AddMessage()
public async Task AddMessageOfCassandra()
{
await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued
var stopwatch = Stopwatch.StartNew();
for (int i = 1; i <= 10000; i++)
{
ClientId = Guid.NewGuid().ToString(),
Message = Array.Empty<byte>(),
DeviceNo = "123321312",
MessageId = Guid.NewGuid().ToString(),
Type = IssuedEventType.Data
});
var str = Guid.NewGuid().ToString();
await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued
{
ClientId = str,
DeviceNo = i.ToString(),
MessageId = str,
Type = IssuedEventType.Data,
Id = str,
Message = str.GetBytes()
});
}
stopwatch.Stop();
_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
}
public async Task AddMessageOfBulkInsertCassandra()
{
var records = new List<MessageIssued>();
var prepared = await _cassandraProvider.Session.PrepareAsync(
$"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)");
for (int i = 1; i <= 100000; i++)
{
var str = Guid.NewGuid().ToString();
records.Add(new MessageIssued
{
ClientId = str,
DeviceNo = i.ToString(),
MessageId = str,
Type = IssuedEventType.Data,
Id = str,
Message = str.GetBytes()
});
}
var stopwatch = Stopwatch.StartNew();
await BulkInsertAsync(_cassandraProvider.Session, prepared, records);
stopwatch.Stop();
_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
}
private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List<MessageIssued> records)
{
var tasks = new List<Task>();
var batch = new BatchStatement();
for (int i = 0; i < records.Count; i++)
{
var record = records[i];
var boundStatement = prepared.Bind(
record.Id,
record.ClientId,
record.Message,
record.DeviceNo,
(int)record.Type,
record.MessageId);
// 设置一致性级别为ONE以提高性能
boundStatement.SetConsistencyLevel(ConsistencyLevel.One);
batch.Add(boundStatement);
// 当达到批处理大小时执行
if (batch.Statements.Count() >= 1000 || i == records.Count - 1)
{
tasks.Add(session.ExecuteAsync(batch));
batch = new BatchStatement();
}
}
// 等待所有批处理完成
await Task.WhenAll(tasks);
}
}

View File

@ -6,6 +6,7 @@ using Cassandra;
using Cassandra.Mapping;
using Cassandra.Data.Linq;
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
@ -35,6 +36,12 @@ namespace JiShe.CollectBus.Cassandra
_logger = logger;
}
public Task InitClusterAndSessionAsync()
{
InitClusterAndSession();
return Task.CompletedTask;
}
public void InitClusterAndSession()
{
GetCluster((keyspace) =>

View File

@ -1,6 +1,12 @@
using Cassandra;
using Cassandra.Data.Linq;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Extensions;
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.IoTDBProvider;
using Microsoft.AspNetCore.Http;
using System.Reflection;
using Thrift.Protocol.Entities;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories;
@ -10,9 +16,10 @@ namespace JiShe.CollectBus.Cassandra
: ICassandraRepository<TEntity, TKey>
where TEntity : class
{
private readonly ICassandraProvider _cassandraProvider;
public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig)
{
_cassandraProvider = cassandraProvider;
Mapper = new Mapper(cassandraProvider.Session, mappingConfig);
cassandraProvider.Session.CreateTable<TEntity>(cassandraProvider.CassandraConfig.Keyspace);
}

View File

@ -13,17 +13,16 @@ namespace JiShe.CollectBus.Cassandra
)]
public class CollectBusCassandraModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
public override Task ConfigureServicesAsync(ServiceConfigurationContext context)
{
Configure<CassandraConfig>(context.Services.GetConfiguration().GetSection("Cassandra"));
// context.AddCassandra();
context.AddCassandra();
return Task.CompletedTask;
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
// context.UseCassandra();
await context.UseCassandra();
}
}
}

View File

@ -13,11 +13,11 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class ApplicationInitializationContextExtensions
{
public static void UseCassandra(this ApplicationInitializationContext context)
public static async Task UseCassandra(this ApplicationInitializationContext context)
{
var service = context.ServiceProvider;
var cassandraProvider = service.GetRequiredService<ICassandraProvider>();
cassandraProvider.InitClusterAndSession();
await cassandraProvider.InitClusterAndSessionAsync();
}
}
@ -25,11 +25,9 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static void AddCassandra(this ServiceConfigurationContext context)
{
context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
var mappingConfig = new MappingConfiguration()
.Define(new CollectBusMapping());
context.Services.AddSingleton(mappingConfig);
context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
context.Services.AddSingleton(new MappingConfiguration()
.Define(new CollectBusMapping()));
}
}
}

View File

@ -4,6 +4,8 @@ using Cassandra;
using System.ComponentModel.DataAnnotations;
using JiShe.CollectBus.Common.Attributes;
using Cassandra.Mapping;
using Cassandra.Data.Linq;
using Thrift.Protocol.Entities;
namespace JiShe.CollectBus.Cassandra.Extensions
{
@ -14,7 +16,8 @@ namespace JiShe.CollectBus.Cassandra.Extensions
var type = typeof(TEntity);
var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>();
var tableName = tableAttribute?.Name ?? type.Name.ToLower();
var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
//var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
var tableKeyspace = session.Keyspace;
var properties = type.GetProperties();
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<KeyAttribute>() != null);
@ -79,5 +82,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions
throw new NotSupportedException($"不支持的类型: {type.Name}");
}
}
}

View File

@ -20,5 +20,7 @@ namespace JiShe.CollectBus.Cassandra
Cluster GetCluster(Action<string?>? callback = null);
void InitClusterAndSession();
Task InitClusterAndSessionAsync();
}
}

View File

@ -9,17 +9,9 @@ using Volo.Abp;
namespace JiShe.CollectBus.Common.Attributes
{
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class CassandraTableAttribute : Attribute
public class CassandraTableAttribute(string? name = null) : Attribute
{
public CassandraTableAttribute(string? name = null,string? keyspace =null)
{
Name = name;
Keyspace = keyspace;
}
public virtual string? Name { get; }
public virtual string? Keyspace { get; }
public virtual string? Name { get; } = name;
}
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]

View File

@ -6,17 +6,24 @@ using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.Common.Enums;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MessageIssueds
{
[CassandraTable]
public class MessageIssued
public class MessageIssued:IEntity<string>
{
[Key]
public string ClientId { get; set; }
public byte[] Message { get; set; }
public string DeviceNo { get; set; }
public IssuedEventType Type { get; set; }
public string MessageId { get; set; }
[Key]
public string Id { get; set; }
public object?[] GetKeys()
{
return new object[] { Id };
}
}
}

View File

@ -34,7 +34,7 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"ConnectionStrings": {
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
@ -156,6 +156,12 @@
"Port": 9043,
"DataCenter": "dc1",
"Rack": "RAC2"
},
{
"Host": "192.168.1.9",
"Port": 9044,
"DataCenter": "dc1",
"Rack": "RAC2"
}
],
"Username": "admin",

View File

@ -9,6 +9,7 @@ using MongoDB.Driver;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Volo.Abp.Data;
using Volo.Abp.MongoDB;
using Volo.Abp.MultiTenancy;
@ -29,6 +30,10 @@ public class CollectBusMongoDbContext : AbpMongoDbContext, ICollectBusMongoDbCon
public IMongoCollection<Device> Devices => Collection<Device>();
public IMongoCollection<ProtocolInfo> ProtocolInfos => Collection<ProtocolInfo>();
public IMongoCollection<MessageIssued> MessageIssueds => Collection<MessageIssued>();
protected override void CreateModel(IMongoModelBuilder modelBuilder)
{
//modelBuilder.Entity<MeterReadingRecords>(builder =>

View File

@ -28,7 +28,7 @@ public class CollectBusMongoDbModule : AbpModule
{
context.Services.AddMongoDbContext<CollectBusMongoDbContext>(options =>
{
options.AddDefaultRepositories();
options.AddDefaultRepositories(includeAllEntities: true);
// 注册分表策略
context.Services.AddTransient(