From 83b7de52d518b171ef107157557d868bb482415b Mon Sep 17 00:00:00 2001
From: cli <377476583@qq.com>
Date: Mon, 14 Apr 2025 15:31:10 +0800
Subject: [PATCH 1/3] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E8=BF=81=E5=85=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
JiShe.CollectBus.sln | 7 ++
.../BaseCassandraRepository.cs | 87 +++++++++++++++++++
.../CassandraConfiguration.cs | 45 ++++++++++
.../CassandraService.cs | 40 +++++++++
.../CollectBusCassandraModule.cs | 13 +++
.../ICassandraService.cs | 9 ++
.../JiShe.CollectBus.Cassandra.csproj | 19 ++++
7 files changed, 220 insertions(+)
create mode 100644 src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraService.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/ICassandraService.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj
diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index c26f3da..9c67aae 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -37,6 +37,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvi
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -107,6 +109,10 @@ Global
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
+ {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -128,6 +134,7 @@ Global
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
+ {443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
diff --git a/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs
new file mode 100644
index 0000000..67252e4
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs
@@ -0,0 +1,87 @@
+using Cassandra.Mapping;
+using Cassandra;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ ///
+ /// Cassandra数据库的基础仓储类
+ /// 提供了通用的CRUD操作方法,所有具体的仓储类都应该继承此类
+ ///
+ /// 实体类型
+ public abstract class BaseCassandraRepository where T : class
+ {
+ ///
+ /// Cassandra数据库会话
+ /// 用于执行CQL查询和操作
+ ///
+ protected readonly ISession Session;
+
+ ///
+ /// Cassandra映射器
+ /// 用于对象关系映射(ORM),简化实体与数据库表之间的转换
+ ///
+ protected readonly IMapper Mapper;
+
+ ///
+ /// 构造函数
+ /// 初始化数据库会话和映射器
+ ///
+ /// Cassandra连接工厂
+ protected BaseCassandraRepository(ICassandraService cassandraService)
+ {
+ Session = cassandraService.GetSession();
+ Mapper = new Mapper(Session);
+ }
+
+ ///
+ /// 根据ID获取单个实体
+ ///
+ /// 实体ID
+ /// 返回找到的实体,如果未找到则返回null
+ public async Task GetByIdAsync(string id)
+ {
+ return await Mapper.SingleOrDefaultAsync($"WHERE id = ?", id);
+ }
+
+ ///
+ /// 获取所有实体
+ ///
+ /// 返回实体集合
+ public async Task> GetAllAsync()
+ {
+ return await Mapper.FetchAsync();
+ }
+
+ ///
+ /// 插入新实体
+ ///
+ /// 要插入的实体
+ public async Task InsertAsync(T entity)
+ {
+ await Mapper.InsertAsync(entity);
+ }
+
+ ///
+ /// 更新现有实体
+ ///
+ /// 要更新的实体
+ public async Task UpdateAsync(T entity)
+ {
+ await Mapper.UpdateAsync(entity);
+ }
+
+ ///
+ /// 根据ID删除实体
+ ///
+ /// 要删除的实体ID
+ public async Task DeleteAsync(string id)
+ {
+ await Mapper.DeleteAsync($"WHERE id = ?", id);
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs b/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs
new file mode 100644
index 0000000..8fea3c1
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs
@@ -0,0 +1,45 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ ///
+ /// Cassandra数据库配置类
+ /// 用于存储和管理Cassandra数据库的连接配置信息
+ ///
+ public class CassandraConfiguration
+ {
+ ///
+ /// Cassandra集群的节点地址列表
+ /// 可以配置多个节点地址,用于实现高可用和负载均衡
+ ///
+ public string[] ContactPoints { get; set; }
+
+ ///
+ /// Cassandra的键空间名称
+ /// 键空间是Cassandra中数据组织的最高级别容器
+ ///
+ public string Keyspace { get; set; }
+
+ ///
+ /// Cassandra数据库的用户名
+ /// 用于身份验证
+ ///
+ public string Username { get; set; }
+
+ ///
+ /// Cassandra数据库的密码
+ /// 用于身份验证
+ ///
+ public string Password { get; set; }
+
+ ///
+ /// Cassandra数据库的端口号
+ /// 默认值为9042,这是Cassandra的默认CQL端口
+ ///
+ public int Port { get; set; } = 9042;
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraService.cs b/src/JiShe.CollectBus.Cassandra/CassandraService.cs
new file mode 100644
index 0000000..8836235
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/CassandraService.cs
@@ -0,0 +1,40 @@
+using System.Diagnostics.Metrics;
+using Cassandra;
+using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public class CassandraService : ICassandraService, ISingletonDependency
+ {
+ private readonly CassandraConfiguration _configuration;
+ public ISession Instance { get; set; } = default;
+
+ ///
+ /// CassandraService
+ ///
+ ///
+ public CassandraService(IOptions configuration)
+ {
+ _configuration = configuration.Value;
+ GetSession();
+ }
+
+ ///
+ /// 获取Cassandra数据库会话
+ ///
+ ///
+ public ISession GetSession()
+ {
+ var cluster = Cluster.Builder()
+ .AddContactPoints(_configuration.ContactPoints)
+ .WithPort(_configuration.Port)
+ .WithCredentials(_configuration.Username, _configuration.Password)
+ .Build();
+
+ Instance = cluster.Connect(_configuration.Keyspace);
+
+ return Instance;
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs
new file mode 100644
index 0000000..e7c2c69
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs
@@ -0,0 +1,13 @@
+using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp.Modularity;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public class CollectBusCassandraModule : AbpModule
+ {
+ public override void ConfigureServices(ServiceConfigurationContext context)
+ {
+ context.Services.Configure(context.Services.GetConfiguration().GetSection("Cassandra"));
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraService.cs b/src/JiShe.CollectBus.Cassandra/ICassandraService.cs
new file mode 100644
index 0000000..70eb5d6
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/ICassandraService.cs
@@ -0,0 +1,9 @@
+using Cassandra;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public interface ICassandraService
+ {
+ ISession GetSession();
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj
new file mode 100644
index 0000000..5a8b22d
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj
@@ -0,0 +1,19 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
From aa55e476c2a0066331642f86ee4cbfeef9c59fc2 Mon Sep 17 00:00:00 2001
From: cli <377476583@qq.com>
Date: Tue, 15 Apr 2025 17:57:47 +0800
Subject: [PATCH 2/3] =?UTF-8?q?=E5=B0=81=E8=A3=85Cassandra=E6=95=B0?=
=?UTF-8?q?=E6=8D=AE=E5=BA=93=E6=93=8D=E4=BD=9C?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../CollectBusApplicationModule.cs | 17 +-
.../JiShe.CollectBus.Application.csproj | 3 +
.../Samples/TestAppService.cs | 51 ++++++
.../Workers/CreateToBeIssueTaskWorker.cs | 2 +-
.../Workers/SubscriberFifteenMinuteWorker.cs | 2 +-
.../Workers/SubscriberFiveMinuteWorker.cs | 2 +-
.../Workers/SubscriberOneMinuteWorker.cs | 2 +-
.../BaseCassandraRepository.cs | 87 ----------
.../CassandraConfig.cs | 64 +++++++
.../CassandraConfiguration.cs | 45 -----
.../CassandraProvider.cs | 147 +++++++++++++++++
.../CassandraQueryOptimizer.cs | 156 ++++++++++++++++++
.../CassandraRepository.cs | 69 ++++++++
.../CassandraService.cs | 40 -----
.../CollectBusCassandraModule.cs | 20 ++-
.../Extensions/ServiceCollectionExtensions.cs | 35 ++++
.../Extensions/SessionExtension.cs | 83 ++++++++++
.../ICassandraProvider.cs | 24 +++
.../ICassandraRepository.cs | 20 +++
.../ICassandraService.cs | 9 -
.../JiShe.CollectBus.Cassandra.csproj | 5 +-
.../Mappers/CollectBusMapping.cs | 20 +++
.../Attributes/CassandraTableAttribute.cs | 32 ++++
.../MessageIssueds/MessageIssued.cs | 4 +
src/JiShe.CollectBus.Host/Program.cs | 40 ++++-
src/JiShe.CollectBus.Host/Startup.cs | 9 +-
src/JiShe.CollectBus.Host/appsettings.json | 45 ++++-
.../JiSheCollectBusProtocolModule.cs | 4 +-
28 files changed, 830 insertions(+), 207 deletions(-)
create mode 100644 src/JiShe.CollectBus.Application/Samples/TestAppService.cs
delete mode 100644 src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraConfig.cs
delete mode 100644 src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraProvider.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/CassandraRepository.cs
delete mode 100644 src/JiShe.CollectBus.Cassandra/CassandraService.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs
delete mode 100644 src/JiShe.CollectBus.Cassandra/ICassandraService.cs
create mode 100644 src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs
create mode 100644 src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs
diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
index 1a8b326..5f80f51 100644
--- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
+++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
@@ -12,8 +12,11 @@ using Microsoft.Extensions.DependencyInjection;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
+using System.Threading.Tasks;
+using JiShe.CollectBus.Cassandra;
using Volo.Abp;
using Volo.Abp.Application;
+using Volo.Abp.Autofac;
using Volo.Abp.AutoMapper;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.BackgroundWorkers.Hangfire;
@@ -27,11 +30,13 @@ namespace JiShe.CollectBus;
typeof(CollectBusApplicationContractsModule),
typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule),
+ typeof(AbpAutofacModule),
typeof(AbpBackgroundWorkersHangfireModule),
typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule),
typeof(CollectBusKafkaModule),
- typeof(CollectBusIoTDBModule)
+ typeof(CollectBusIoTDBModule),
+ typeof(CollectBusCassandraModule)
)]
public class CollectBusApplicationModule : AbpModule
{
@@ -46,20 +51,20 @@ public class CollectBusApplicationModule : AbpModule
});
}
- public override void OnApplicationInitialization(
+ public override async Task OnApplicationInitializationAsync(
ApplicationInitializationContext context)
{
var assembly = Assembly.GetExecutingAssembly();
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
foreach (var type in types)
{
- context.AddBackgroundWorkerAsync(type);
+ await context.AddBackgroundWorkerAsync(type);
}
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService();
- dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
- dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
+ await dbContext.InitAmmeterCacheData();
+ await dbContext.InitWatermeterCacheData();
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService();
@@ -70,7 +75,7 @@ public class CollectBusApplicationModule : AbpModule
foreach (var item in topics)
{
- kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor));
+ await kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor));
}
}
diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
index 1726f39..a2cc613 100644
--- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
+++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj
@@ -24,8 +24,11 @@
+
+
+
diff --git a/src/JiShe.CollectBus.Application/Samples/TestAppService.cs b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs
new file mode 100644
index 0000000..9c53c3a
--- /dev/null
+++ b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs
@@ -0,0 +1,51 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Apache.IoTDB.DataStructure;
+using Apache.IoTDB;
+using Confluent.Kafka;
+using JiShe.CollectBus.Ammeters;
+using JiShe.CollectBus.FreeSql;
+using JiShe.CollectBus.IoTDBProvider;
+using JiShe.CollectBus.IotSystems.PrepayModel;
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.Extensions.Options;
+using JiShe.CollectBus.IoTDBProvider.Context;
+using Microsoft.Extensions.Logging;
+using JiShe.CollectBus.Common.Helpers;
+using JiShe.CollectBus.IotSystems.AFNEntity;
+using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using Microsoft.Extensions.DependencyInjection;
+using JiShe.CollectBus.Cassandra;
+using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.IotSystems.MessageIssueds;
+using Volo.Abp.Application.Services;
+
+namespace JiShe.CollectBus.Samples;
+
+[AllowAnonymous]
+public class TestAppService : CollectBusAppService, IApplicationService
+{
+ private readonly ILogger _logger;
+ private readonly ICassandraRepository _messageIssuedCassandraRepository;
+ public TestAppService(
+ ILogger logger,
+ ICassandraRepository messageIssuedCassandraRepository
+ )
+ {
+ _logger = logger;
+ _messageIssuedCassandraRepository = messageIssuedCassandraRepository;
+ }
+ public async Task AddMessage()
+ {
+ await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued
+ {
+ ClientId = Guid.NewGuid().ToString(),
+ Message = Array.Empty(),
+ DeviceNo = "123321312",
+ MessageId = Guid.NewGuid().ToString(),
+ Type = IssuedEventType.Data
+ });
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs
index 05fd90d..e994530 100644
--- a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs
+++ b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs
@@ -27,7 +27,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(CreateToBeIssueTaskWorker);
- CronExpression = $"{10}/* * * * *";
+ CronExpression = "* 10 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs
index f1bf5a1..441b22a 100644
--- a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs
+++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs
@@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
- CronExpression = $"*/{15} * * * *";
+ CronExpression = "* 15 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs
index 2e491d6..0a61c63 100644
--- a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs
+++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs
@@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFiveMinuteWorker);
- CronExpression = $"*/{5} * * * *";
+ CronExpression = "* 5 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs
index 82b979b..8b7cbfd 100644
--- a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs
+++ b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs
@@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberOneMinuteWorker);
- CronExpression = $"*/{1} * * * *";
+ CronExpression = "* 1 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}
diff --git a/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs
deleted file mode 100644
index 67252e4..0000000
--- a/src/JiShe.CollectBus.Cassandra/BaseCassandraRepository.cs
+++ /dev/null
@@ -1,87 +0,0 @@
-using Cassandra.Mapping;
-using Cassandra;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace JiShe.CollectBus.Cassandra
-{
- ///
- /// Cassandra数据库的基础仓储类
- /// 提供了通用的CRUD操作方法,所有具体的仓储类都应该继承此类
- ///
- /// 实体类型
- public abstract class BaseCassandraRepository where T : class
- {
- ///
- /// Cassandra数据库会话
- /// 用于执行CQL查询和操作
- ///
- protected readonly ISession Session;
-
- ///
- /// Cassandra映射器
- /// 用于对象关系映射(ORM),简化实体与数据库表之间的转换
- ///
- protected readonly IMapper Mapper;
-
- ///
- /// 构造函数
- /// 初始化数据库会话和映射器
- ///
- /// Cassandra连接工厂
- protected BaseCassandraRepository(ICassandraService cassandraService)
- {
- Session = cassandraService.GetSession();
- Mapper = new Mapper(Session);
- }
-
- ///
- /// 根据ID获取单个实体
- ///
- /// 实体ID
- /// 返回找到的实体,如果未找到则返回null
- public async Task GetByIdAsync(string id)
- {
- return await Mapper.SingleOrDefaultAsync($"WHERE id = ?", id);
- }
-
- ///
- /// 获取所有实体
- ///
- /// 返回实体集合
- public async Task> GetAllAsync()
- {
- return await Mapper.FetchAsync();
- }
-
- ///
- /// 插入新实体
- ///
- /// 要插入的实体
- public async Task InsertAsync(T entity)
- {
- await Mapper.InsertAsync(entity);
- }
-
- ///
- /// 更新现有实体
- ///
- /// 要更新的实体
- public async Task UpdateAsync(T entity)
- {
- await Mapper.UpdateAsync(entity);
- }
-
- ///
- /// 根据ID删除实体
- ///
- /// 要删除的实体ID
- public async Task DeleteAsync(string id)
- {
- await Mapper.DeleteAsync($"WHERE id = ?", id);
- }
- }
-}
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs b/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs
new file mode 100644
index 0000000..c14171e
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs
@@ -0,0 +1,64 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public class CassandraConfig
+ {
+ public Node[] Nodes { get; set; }
+ public string Username { get; set; }
+ public string Password { get; set; }
+ public string Keyspace { get; set; }
+ public string ConsistencyLevel { get; set; }
+ public Pooling PoolingOptions { get; set; }
+ public Socket SocketOptions { get; set; }
+ public Query QueryOptions { get; set; }
+
+ public ReplicationStrategy ReplicationStrategy { get; set; }
+ }
+
+ public class Pooling
+ {
+ public int CoreConnectionsPerHost { get; set; }
+ public int MaxConnectionsPerHost { get; set; }
+ public int MaxRequestsPerConnection { get; set; }
+ }
+
+ public class Socket
+ {
+ public int ConnectTimeoutMillis { get; set; }
+ public int ReadTimeoutMillis { get; set; }
+ }
+
+ public class Query
+ {
+ public string ConsistencyLevel { get; set; }
+ public string SerialConsistencyLevel { get; set; }
+ public bool DefaultIdempotence { get; set; }
+ }
+
+ public class ReplicationStrategy
+ {
+ public string Class { get; set; }
+ public DataCenter[] DataCenters { get; set; }
+ }
+
+ public class DataCenter
+ {
+ public string Name { get; set; }
+ public int ReplicationFactor { get; set; }
+ public string Strategy { get; set; }
+ }
+
+ public class Node
+ {
+ public string Host { get; set; }
+ public int Port { get; set; }
+ public string DataCenter { get; set; }
+ public string Rack { get; set; }
+ }
+
+}
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs b/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs
deleted file mode 100644
index 8fea3c1..0000000
--- a/src/JiShe.CollectBus.Cassandra/CassandraConfiguration.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace JiShe.CollectBus.Cassandra
-{
- ///
- /// Cassandra数据库配置类
- /// 用于存储和管理Cassandra数据库的连接配置信息
- ///
- public class CassandraConfiguration
- {
- ///
- /// Cassandra集群的节点地址列表
- /// 可以配置多个节点地址,用于实现高可用和负载均衡
- ///
- public string[] ContactPoints { get; set; }
-
- ///
- /// Cassandra的键空间名称
- /// 键空间是Cassandra中数据组织的最高级别容器
- ///
- public string Keyspace { get; set; }
-
- ///
- /// Cassandra数据库的用户名
- /// 用于身份验证
- ///
- public string Username { get; set; }
-
- ///
- /// Cassandra数据库的密码
- /// 用于身份验证
- ///
- public string Password { get; set; }
-
- ///
- /// Cassandra数据库的端口号
- /// 默认值为9042,这是Cassandra的默认CQL端口
- ///
- public int Port { get; set; } = 9042;
- }
-}
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs
new file mode 100644
index 0000000..a5cd9c8
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs
@@ -0,0 +1,147 @@
+using System;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using Cassandra;
+using Cassandra.Mapping;
+using Cassandra.Data.Linq;
+using System.ComponentModel.DataAnnotations;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
+using JiShe.CollectBus.Common.Attributes;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency
+ {
+ private readonly ILogger _logger;
+
+ public Cluster Instance { get; set; }
+
+ public ISession Session { get; set; }
+
+ public CassandraConfig CassandraConfig { get; set; }
+ ///
+ ///
+ ///
+ ///
+ ///
+ public CassandraProvider(
+ IOptions options,
+ ILogger logger)
+ {
+ CassandraConfig = options.Value;
+ _logger = logger;
+ }
+
+ public void InitClusterAndSession()
+ {
+ GetCluster((keyspace) =>
+ {
+ GetSession(keyspace);
+ });
+ }
+
+ public Cluster GetCluster(Action? callback=null)
+ {
+ var clusterBuilder = Cluster.Builder();
+
+ // 添加多个节点
+ foreach (var node in CassandraConfig.Nodes)
+ {
+ clusterBuilder.AddContactPoint(node.Host)
+ .WithPort(node.Port);
+ }
+
+ clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password);
+
+ // 优化连接池配置
+ var poolingOptions = new PoolingOptions()
+ .SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost)
+ .SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost)
+ .SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection)
+ .SetHeartBeatInterval(30000); // 30秒心跳
+
+ clusterBuilder.WithPoolingOptions(poolingOptions);
+
+ // 优化Socket配置
+ var socketOptions = new SocketOptions()
+ .SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis)
+ .SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis)
+ .SetTcpNoDelay(true) // 启用Nagle算法
+ .SetKeepAlive(true) // 启用TCP保活
+ .SetReceiveBufferSize(32768) // 32KB接收缓冲区
+ .SetSendBufferSize(32768); // 32KB发送缓冲区
+
+ clusterBuilder.WithSocketOptions(socketOptions);
+
+ // 优化查询选项
+ var queryOptions = new QueryOptions()
+ .SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel))
+ .SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel))
+ .SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence)
+ .SetPageSize(5000); // 增加页面大小
+
+ clusterBuilder.WithQueryOptions(queryOptions);
+
+ // 启用压缩
+ clusterBuilder.WithCompression(CompressionType.LZ4);
+
+ // 配置重连策略
+ clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000));
+ Instance = clusterBuilder.Build();
+ callback?.Invoke(null);
+ return Instance;
+ }
+
+ public ISession GetSession(string? keyspace = null)
+ {
+ if (string.IsNullOrEmpty(keyspace))
+ {
+ keyspace = CassandraConfig.Keyspace;
+ }
+ Session = Instance.Connect();
+ var replication = GetReplicationStrategy();
+ Session.CreateKeyspaceIfNotExists(keyspace, replication);
+ Session.ChangeKeyspace(keyspace);
+ return Session;
+ }
+
+ private Dictionary GetReplicationStrategy()
+ {
+ var strategy = CassandraConfig.ReplicationStrategy.Class;
+ var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters;
+
+ switch (strategy)
+ {
+ case "NetworkTopologyStrategy":
+ var networkDic = new Dictionary { { "class", "NetworkTopologyStrategy" } };
+ foreach (var dataCenter in dataCenters)
+ {
+ networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString());
+ }
+ return networkDic;
+ case "SimpleStrategy":
+ var dic = new Dictionary { { "class", "SimpleStrategy" } };
+ if (dataCenters.Length >= 1)
+ {
+ dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString());
+ }
+ else
+ {
+ _logger.LogError("SimpleStrategy 不支持多个数据中心!");
+ }
+ return dic;
+ default:
+ throw new ArgumentNullException($"Strategy", "Strategy配置错误!");
+ }
+ }
+
+ public void Dispose()
+ {
+ Instance.Dispose();
+ Session.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs b/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs
new file mode 100644
index 0000000..0ea1b56
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs
@@ -0,0 +1,156 @@
+using System.Collections.Concurrent;
+using Cassandra;
+using Cassandra.Mapping;
+using Microsoft.Extensions.Caching.Memory;
+using Microsoft.Extensions.Logging;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public class CassandraQueryOptimizer
+ {
+ private readonly ISession _session;
+ private readonly ILogger _logger;
+ private readonly IMemoryCache _cache;
+ private readonly ConcurrentDictionary _preparedStatements;
+ private readonly int _batchSize;
+ private readonly TimeSpan _cacheExpiration;
+
+ public CassandraQueryOptimizer(
+ ISession session,
+ ILogger logger,
+ IMemoryCache cache,
+ int batchSize = 100,
+ TimeSpan? cacheExpiration = null)
+ {
+ _session = session;
+ _logger = logger;
+ _cache = cache;
+ _preparedStatements = new ConcurrentDictionary();
+ _batchSize = batchSize;
+ _cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5);
+ }
+
+ public async Task GetOrPrepareStatementAsync(string cql)
+ {
+ return _preparedStatements.GetOrAdd(cql, key =>
+ {
+ try
+ {
+ var statement = _session.Prepare(key);
+ _logger.LogDebug($"Prepared statement for CQL: {key}");
+ return statement;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Failed to prepare statement for CQL: {key}");
+ throw;
+ }
+ });
+ }
+
+ public async Task ExecuteBatchAsync(IEnumerable statements)
+ {
+ var batch = new BatchStatement();
+ var count = 0;
+
+ foreach (var statement in statements)
+ {
+ batch.Add(statement);
+ count++;
+
+ if (count >= _batchSize)
+ {
+ await ExecuteBatchAsync(batch);
+ batch = new BatchStatement();
+ count = 0;
+ }
+ }
+
+ if (count > 0)
+ {
+ await ExecuteBatchAsync(batch);
+ }
+ }
+
+ private async Task ExecuteBatchAsync(BatchStatement batch)
+ {
+ try
+ {
+ await _session.ExecuteAsync(batch);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to execute batch statement");
+ throw;
+ }
+ }
+
+ public async Task GetOrSetFromCacheAsync(string cacheKey, Func> getData)
+ {
+ if (_cache.TryGetValue(cacheKey, out T cachedValue))
+ {
+ _logger.LogDebug($"Cache hit for key: {cacheKey}");
+ return cachedValue;
+ }
+
+ var data = await getData();
+ _cache.Set(cacheKey, data, _cacheExpiration);
+ _logger.LogDebug($"Cache miss for key: {cacheKey}, data cached");
+ return data;
+ }
+
+ public async Task> ExecutePagedQueryAsync(
+ string cql,
+ object[] parameters,
+ int pageSize = 100,
+ string pagingState = null) where T : class
+ {
+ var statement = await GetOrPrepareStatementAsync(cql);
+ var boundStatement = statement.Bind(parameters);
+
+ if (!string.IsNullOrEmpty(pagingState))
+ {
+ boundStatement.SetPagingState(Convert.FromBase64String(pagingState));
+ }
+
+ boundStatement.SetPageSize(pageSize);
+
+ try
+ {
+ var result = await _session.ExecuteAsync(boundStatement);
+ //TODO: RETURN OBJECT
+ throw new NotImplementedException();
+ //result.GetRows()
+ //return result.Select(row => row);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Failed to execute paged query: {cql}");
+ throw;
+ }
+ }
+
+ public async Task BulkInsertAsync(IEnumerable items, string tableName)
+ {
+ var mapper = new Mapper(_session);
+ var batch = new List();
+ var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})";
+
+ foreach (var chunk in items.Chunk(_batchSize))
+ {
+ var statements = chunk.Select(item =>
+ {
+ var props = typeof(T).GetProperties();
+ var columns = string.Join(", ", props.Select(p => p.Name));
+ var values = string.Join(", ", props.Select(p => "?"));
+ var statement = _session.Prepare(string.Format(cql, columns, values));
+ return statement.Bind(props.Select(p => p.GetValue(item)).ToArray());
+ });
+
+ batch.AddRange(statements);
+ }
+
+ await ExecuteBatchAsync(batch);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs
new file mode 100644
index 0000000..8381450
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs
@@ -0,0 +1,69 @@
+using Cassandra;
+using Cassandra.Mapping;
+using JiShe.CollectBus.Cassandra.Extensions;
+using Volo.Abp.Domain.Entities;
+using Volo.Abp.Domain.Repositories;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public class CassandraRepository
+ : ICassandraRepository
+ where TEntity : class
+ {
+
+ public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig)
+ {
+ Mapper = new Mapper(cassandraProvider.Session, mappingConfig);
+ cassandraProvider.Session.CreateTable(cassandraProvider.CassandraConfig.Keyspace);
+ }
+
+ public readonly IMapper Mapper;
+
+ public virtual async Task GetAsync(TKey id)
+ {
+ return await Mapper.SingleOrDefaultAsync("WHERE id = ?", id);
+ }
+
+ public virtual async Task> GetListAsync()
+ {
+ return (await Mapper.FetchAsync()).ToList();
+ }
+
+ public virtual async Task InsertAsync(TEntity entity)
+ {
+ await Mapper.InsertAsync(entity);
+ return entity;
+ }
+
+ public virtual async Task UpdateAsync(TEntity entity)
+ {
+ await Mapper.UpdateAsync(entity);
+ return entity;
+ }
+
+ public virtual async Task DeleteAsync(TEntity entity)
+ {
+ await Mapper.DeleteAsync(entity);
+ }
+
+ public virtual async Task DeleteAsync(TKey id)
+ {
+ await Mapper.DeleteAsync("WHERE id = ?", id);
+ }
+
+ public virtual async Task> GetPagedListAsync(
+ int skipCount,
+ int maxResultCount,
+ string sorting)
+ {
+ var cql = $"SELECT * FROM {typeof(TEntity).Name.ToLower()}";
+ if (!string.IsNullOrWhiteSpace(sorting))
+ {
+ cql += $" ORDER BY {sorting}";
+ }
+ cql += $" LIMIT {maxResultCount} OFFSET {skipCount}";
+
+ return (await Mapper.FetchAsync(cql)).ToList();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Cassandra/CassandraService.cs b/src/JiShe.CollectBus.Cassandra/CassandraService.cs
deleted file mode 100644
index 8836235..0000000
--- a/src/JiShe.CollectBus.Cassandra/CassandraService.cs
+++ /dev/null
@@ -1,40 +0,0 @@
-using System.Diagnostics.Metrics;
-using Cassandra;
-using Microsoft.Extensions.Options;
-using Volo.Abp.DependencyInjection;
-
-namespace JiShe.CollectBus.Cassandra
-{
- public class CassandraService : ICassandraService, ISingletonDependency
- {
- private readonly CassandraConfiguration _configuration;
- public ISession Instance { get; set; } = default;
-
- ///
- /// CassandraService
- ///
- ///
- public CassandraService(IOptions configuration)
- {
- _configuration = configuration.Value;
- GetSession();
- }
-
- ///
- /// 获取Cassandra数据库会话
- ///
- ///
- public ISession GetSession()
- {
- var cluster = Cluster.Builder()
- .AddContactPoints(_configuration.ContactPoints)
- .WithPort(_configuration.Port)
- .WithCredentials(_configuration.Username, _configuration.Password)
- .Build();
-
- Instance = cluster.Connect(_configuration.Keyspace);
-
- return Instance;
- }
- }
-}
diff --git a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs
index e7c2c69..2502420 100644
--- a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs
+++ b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs
@@ -1,13 +1,29 @@
-using Microsoft.Extensions.DependencyInjection;
+using Cassandra;
+using Cassandra.Mapping;
+using JiShe.CollectBus.Cassandra.Mappers;
+using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp;
+using Volo.Abp.Autofac;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Cassandra
{
+ [DependsOn(
+ typeof(AbpAutofacModule)
+ )]
public class CollectBusCassandraModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
- context.Services.Configure(context.Services.GetConfiguration().GetSection("Cassandra"));
+ Configure(context.Services.GetConfiguration().GetSection("Cassandra"));
+
+ context.AddCassandra();
+
+ }
+
+ public override void OnApplicationInitialization(ApplicationInitializationContext context)
+ {
+ context.UseCassandra();
}
}
}
diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs
new file mode 100644
index 0000000..9c6a044
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs
@@ -0,0 +1,35 @@
+using Autofac.Core;
+using Cassandra;
+using Cassandra.Mapping;
+using JiShe.CollectBus.Cassandra;
+using JiShe.CollectBus.Cassandra.Mappers;
+using Microsoft.Extensions.Options;
+using System.Reflection;
+using Volo.Abp;
+using Volo.Abp.Modularity;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class ApplicationInitializationContextExtensions
+ {
+ public static void UseCassandra(this ApplicationInitializationContext context)
+ {
+ var service = context.ServiceProvider;
+ var cassandraProvider = service.GetRequiredService();
+ cassandraProvider.InitClusterAndSession();
+ }
+ }
+
+ public static class ServiceCollectionExtensions
+ {
+ public static void AddCassandra(this ServiceConfigurationContext context)
+ {
+ context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
+
+ var mappingConfig = new MappingConfiguration()
+ .Define(new CollectBusMapping());
+ context.Services.AddSingleton(mappingConfig);
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs
new file mode 100644
index 0000000..f515416
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs
@@ -0,0 +1,83 @@
+using System.Reflection;
+using System.Text;
+using Cassandra;
+using System.ComponentModel.DataAnnotations;
+using JiShe.CollectBus.Common.Attributes;
+using Cassandra.Mapping;
+
+namespace JiShe.CollectBus.Cassandra.Extensions
+{
+ public static class SessionExtension
+ {
+ public static void CreateTable(this ISession session,string? defaultKeyspace=null) where TEntity : class
+ {
+ var type = typeof(TEntity);
+ var tableAttribute = type.GetCustomAttribute();
+ var tableName = tableAttribute?.Name ?? type.Name.ToLower();
+ var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
+
+ var properties = type.GetProperties();
+ var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null);
+
+ if (primaryKey == null)
+ {
+ throw new InvalidOperationException($"No primary key defined for type {type.Name}");
+ }
+
+ var cql = new StringBuilder();
+ cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} (");
+
+ foreach (var prop in properties)
+ {
+ var ignoreAttribute = prop.GetCustomAttribute();
+ if (ignoreAttribute != null) continue;
+ var columnName = prop.Name.ToLower();
+ var cqlType = GetCassandraType(prop.PropertyType);
+
+ cql.Append($"{columnName} {cqlType}, ");
+ }
+ cql.Length -= 2; // Remove last comma and space
+ cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))");
+
+ session.Execute(cql.ToString());
+ }
+
+ private static string GetCassandraType(Type type)
+ {
+ // 基础类型处理
+ switch (Type.GetTypeCode(type))
+ {
+ case TypeCode.String: return "text";
+ case TypeCode.Int32: return "int";
+ case TypeCode.Int64: return "bigint";
+ case TypeCode.Boolean: return "boolean";
+ case TypeCode.DateTime: return "timestamp";
+ case TypeCode.Byte: return "tinyint";
+ }
+
+ if (type == typeof(Guid)) return "uuid";
+ if (type == typeof(DateTimeOffset)) return "timestamp";
+ if (type == typeof(Byte[])) return "blob";
+
+ // 处理集合类型
+ if (type.IsGenericType)
+ {
+ var genericType = type.GetGenericTypeDefinition();
+ var elementType = type.GetGenericArguments()[0];
+
+ if (genericType == typeof(List<>))
+ return $"list<{GetCassandraType(elementType)}>";
+ if (genericType == typeof(HashSet<>))
+ return $"set<{GetCassandraType(elementType)}>";
+ if (genericType == typeof(Dictionary<,>))
+ {
+ var keyType = type.GetGenericArguments()[0];
+ var valueType = type.GetGenericArguments()[1];
+ return $"map<{GetCassandraType(keyType)}, {GetCassandraType(valueType)}>";
+ }
+ }
+
+ throw new NotSupportedException($"不支持的类型: {type.Name}");
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs
new file mode 100644
index 0000000..45c6ca6
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs
@@ -0,0 +1,24 @@
+using Cassandra;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public interface ICassandraProvider
+ {
+ Cluster Instance { get;}
+
+ ISession Session { get;}
+
+ CassandraConfig CassandraConfig { get; }
+
+ ISession GetSession(string? keyspace = null);
+
+ Cluster GetCluster(Action? callback = null);
+
+ void InitClusterAndSession();
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs
new file mode 100644
index 0000000..6dd474f
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Volo.Abp.Domain.Entities;
+
+namespace JiShe.CollectBus.Cassandra
+{
+ public interface ICassandraRepository where TEntity : class
+ {
+ Task GetAsync(TKey id);
+ Task> GetListAsync();
+ Task InsertAsync(TEntity entity);
+ Task UpdateAsync(TEntity entity);
+ Task DeleteAsync(TEntity entity);
+ Task DeleteAsync(TKey id);
+ Task> GetPagedListAsync(int skipCount, int maxResultCount, string sorting);
+ }
+}
diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraService.cs b/src/JiShe.CollectBus.Cassandra/ICassandraService.cs
deleted file mode 100644
index 70eb5d6..0000000
--- a/src/JiShe.CollectBus.Cassandra/ICassandraService.cs
+++ /dev/null
@@ -1,9 +0,0 @@
-using Cassandra;
-
-namespace JiShe.CollectBus.Cassandra
-{
- public interface ICassandraService
- {
- ISession GetSession();
- }
-}
diff --git a/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj
index 5a8b22d..db7ccf3 100644
--- a/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj
+++ b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj
@@ -9,11 +9,14 @@
+
+
-
+
+
diff --git a/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs b/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs
new file mode 100644
index 0000000..d07f8ea
--- /dev/null
+++ b/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs
@@ -0,0 +1,20 @@
+using Cassandra.Mapping;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using JiShe.CollectBus.IotSystems.MessageIssueds;
+using static Cassandra.QueryTrace;
+
+namespace JiShe.CollectBus.Cassandra.Mappers
+{
+ public class CollectBusMapping: Mappings
+ {
+ public CollectBusMapping()
+ {
+ For()
+ .Column(e => e.Type, cm => cm.WithName("type").WithDbType());
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs
new file mode 100644
index 0000000..bdd16d5
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Volo.Abp.EventBus;
+using Volo.Abp;
+
+namespace JiShe.CollectBus.Common.Attributes
+{
+ [AttributeUsage(AttributeTargets.Class, Inherited = false)]
+ public class CassandraTableAttribute : Attribute
+ {
+ public CassandraTableAttribute(string? name = null,string? keyspace =null)
+ {
+ Name = name;
+ Keyspace = keyspace;
+ }
+
+ public virtual string? Name { get; }
+
+ public virtual string? Keyspace { get; }
+ }
+
+ [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]
+ public class CassandraIgnoreAttribute : Attribute
+ {
+ public CassandraIgnoreAttribute()
+ {
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs
index 42d740c..5977d5e 100644
--- a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs
+++ b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs
@@ -1,14 +1,18 @@
using System;
using System.Collections.Generic;
+using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.Common.Enums;
namespace JiShe.CollectBus.IotSystems.MessageIssueds
{
+ [CassandraTable]
public class MessageIssued
{
+ [Key]
public string ClientId { get; set; }
public byte[] Message { get; set; }
public string DeviceNo { get; set; }
diff --git a/src/JiShe.CollectBus.Host/Program.cs b/src/JiShe.CollectBus.Host/Program.cs
index 40a13cc..d75a227 100644
--- a/src/JiShe.CollectBus.Host/Program.cs
+++ b/src/JiShe.CollectBus.Host/Program.cs
@@ -1,12 +1,35 @@
using JiShe.CollectBus.Host;
using Microsoft.AspNetCore.Hosting;
using Serilog;
+using Volo.Abp.Modularity.PlugIns;
public class Program
{
- public static void Main(string[] args)
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static async Task Main(string[] args)
{
- CreateHostBuilder(args).Build().Run();
+ var builder = WebApplication.CreateBuilder(args);
+ builder.Host.UseContentRoot(Directory.GetCurrentDirectory())
+ .UseSerilog((context, loggerConfiguration) =>
+ {
+ loggerConfiguration.ReadFrom.Configuration(context.Configuration);
+ })
+ .UseAutofac();
+ await builder.AddApplicationAsync(options =>
+ {
+ // ز̶ģʽȲ
+ options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins"));
+ });
+ var app = builder.Build();
+ await app.InitializeApplicationAsync();
+ await app.RunAsync();
+
+
+ //await CreateHostBuilder(args).Build().RunAsync();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
@@ -16,13 +39,14 @@ public class Program
{
loggerConfiguration.ReadFrom.Configuration(context.Configuration);
})
- .UseAutofac()
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup();
- });
-
-
+ })
+ .UseAutofac();
+
+
+
private static IHostBuilder CreateConsoleHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
@@ -34,8 +58,8 @@ public class Program
});
- private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
+ private static async Task ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
{
- services.AddApplication();
+ await services.AddApplicationAsync();
}
}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Host/Startup.cs b/src/JiShe.CollectBus.Host/Startup.cs
index dd6453e..32740ee 100644
--- a/src/JiShe.CollectBus.Host/Startup.cs
+++ b/src/JiShe.CollectBus.Host/Startup.cs
@@ -39,10 +39,15 @@ namespace JiShe.CollectBus.Host
/// The lifetime.
public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime)
{
- app.InitializeApplication();
- //await app.InitializeApplicationAsync();
+ app.Use(async (context, next) =>
+ {
+ // 在请求处理之前调用 InitializeApplicationAsync
+ await app.InitializeApplicationAsync();
+ // 继续请求管道中的下一个中间件
+ await next();
+ });
}
}
}
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index a5b2e15..a837636 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -130,5 +130,48 @@
},
"ServerTagName": "JiSheCollectBus",
"KafkaReplicationFactor": 3,
- "NumPartitions": 30
+ "NumPartitions": 30,
+ "Cassandra": {
+ "ReplicationStrategy": {
+ "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下
+ "DataCenters": [
+ {
+ "Name": "dc1",
+ "ReplicationFactor": 3
+ }
+ ]
+ },
+ "Nodes": [
+ {
+ "Host": "192.168.1.9",
+ "Port": 9042,
+ "DataCenter": "dc1",
+ "Rack": "RAC1"
+ },
+ {
+ "Host": "192.168.1.9",
+ "Port": 9043,
+ "DataCenter": "dc1",
+ "Rack": "RAC2"
+ }
+ ],
+ "Username": "admin",
+ "Password": "lixiao1980",
+ "Keyspace": "jishecollectbus",
+ "ConsistencyLevel": "Quorum",
+ "PoolingOptions": {
+ "CoreConnectionsPerHost": 4,
+ "MaxConnectionsPerHost": 8,
+ "MaxRequestsPerConnection": 2000
+ },
+ "SocketOptions": {
+ "ConnectTimeoutMillis": 10000,
+ "ReadTimeoutMillis": 20000
+ },
+ "QueryOptions": {
+ "ConsistencyLevel": "Quorum",
+ "SerialConsistencyLevel": "Serial",
+ "DefaultIdempotence": true
+ }
+ }
}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
index 64661d1..5cda0c8 100644
--- a/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
+++ b/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
@@ -12,10 +12,10 @@ namespace JiShe.CollectBus.Protocol
context.Services.AddKeyedSingleton(nameof(StandardProtocolPlugin));
}
- public override void OnApplicationInitialization(ApplicationInitializationContext context)
+ public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin));
- standardProtocol.AddAsync();
+ await standardProtocol.AddAsync();
}
}
}
From 3b29e58951398c8c5681772745d587f34e937fb7 Mon Sep 17 00:00:00 2001
From: cli <377476583@qq.com>
Date: Tue, 15 Apr 2025 18:01:50 +0800
Subject: [PATCH 3/3] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99?=
=?UTF-8?q?=E9=A1=B9=E7=9B=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
JiShe.CollectBus.sln | 7 -------
1 file changed, 7 deletions(-)
diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index 4b4a650..9c67aae 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -39,8 +39,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{82E4562A-3A7F-4372-8D42-8AE41BA56C04}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -115,10 +113,6 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
- {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -141,7 +135,6 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
- {82E4562A-3A7F-4372-8D42-8AE41BA56C04} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}