diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index c26f3da..9c67aae 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -37,6 +37,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvi EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -107,6 +109,10 @@ Global {A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU {A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU {A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU + {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU + {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU + {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -128,6 +134,7 @@ Global {F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} + {443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 008e736..f3ed978 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -12,8 +12,11 @@ using Microsoft.Extensions.DependencyInjection; using System.Collections.Generic; using System.Linq; using System.Reflection; +using System.Threading.Tasks; +using JiShe.CollectBus.Cassandra; using Volo.Abp; using Volo.Abp.Application; +using Volo.Abp.Autofac; using Volo.Abp.AutoMapper; using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers.Hangfire; @@ -27,11 +30,13 @@ namespace JiShe.CollectBus; typeof(CollectBusApplicationContractsModule), typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), + typeof(AbpAutofacModule), typeof(AbpBackgroundWorkersHangfireModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), - typeof(CollectBusIoTDBModule) + typeof(CollectBusIoTDBModule), + typeof(CollectBusCassandraModule) )] public class CollectBusApplicationModule : AbpModule { @@ -46,20 +51,20 @@ public class CollectBusApplicationModule : AbpModule }); } - public override void OnApplicationInitialization( + public override async Task OnApplicationInitializationAsync( ApplicationInitializationContext context) { var assembly = Assembly.GetExecutingAssembly(); var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList(); foreach (var type in types) { - context.AddBackgroundWorkerAsync(type); + await context.AddBackgroundWorkerAsync(type); } //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); - dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); - //dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + await dbContext.InitAmmeterCacheData(); + //await dbContext.InitWatermeterCacheData(); //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); @@ -70,7 +75,7 @@ public class CollectBusApplicationModule : AbpModule foreach (var item in topics) { - kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor)); + await kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor)); } } diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index 1726f39..a2cc613 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -24,8 +24,11 @@ + + + diff --git a/src/JiShe.CollectBus.Application/Samples/TestAppService.cs b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs new file mode 100644 index 0000000..9c53c3a --- /dev/null +++ b/src/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Apache.IoTDB.DataStructure; +using Apache.IoTDB; +using Confluent.Kafka; +using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.FreeSql; +using JiShe.CollectBus.IoTDBProvider; +using JiShe.CollectBus.IotSystems.PrepayModel; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Options; +using JiShe.CollectBus.IoTDBProvider.Context; +using Microsoft.Extensions.Logging; +using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.IotSystems.AFNEntity; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using JiShe.CollectBus.Cassandra; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.IotSystems.MessageIssueds; +using Volo.Abp.Application.Services; + +namespace JiShe.CollectBus.Samples; + +[AllowAnonymous] +public class TestAppService : CollectBusAppService, IApplicationService +{ + private readonly ILogger _logger; + private readonly ICassandraRepository _messageIssuedCassandraRepository; + public TestAppService( + ILogger logger, + ICassandraRepository messageIssuedCassandraRepository + ) + { + _logger = logger; + _messageIssuedCassandraRepository = messageIssuedCassandraRepository; + } + public async Task AddMessage() + { + await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued + { + ClientId = Guid.NewGuid().ToString(), + Message = Array.Empty(), + DeviceNo = "123321312", + MessageId = Guid.NewGuid().ToString(), + Type = IssuedEventType.Data + }); + } +} diff --git a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index 2857422..b03193a 100644 --- a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -27,7 +27,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(CreateToBeIssueTaskWorker); - CronExpression = $"*/{1} * * * *"; + CronExpression = "* 10 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs index f1bf5a1..441b22a 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs @@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFifteenMinuteWorker); - CronExpression = $"*/{15} * * * *"; + CronExpression = "* 15 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs index 2e491d6..0a61c63 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs @@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFiveMinuteWorker); - CronExpression = $"*/{5} * * * *"; + CronExpression = "* 5 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs index 82b979b..8b7cbfd 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs @@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberOneMinuteWorker); - CronExpression = $"*/{1} * * * *"; + CronExpression = "* 1 * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs b/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs new file mode 100644 index 0000000..c14171e --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraConfig.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraConfig + { + public Node[] Nodes { get; set; } + public string Username { get; set; } + public string Password { get; set; } + public string Keyspace { get; set; } + public string ConsistencyLevel { get; set; } + public Pooling PoolingOptions { get; set; } + public Socket SocketOptions { get; set; } + public Query QueryOptions { get; set; } + + public ReplicationStrategy ReplicationStrategy { get; set; } + } + + public class Pooling + { + public int CoreConnectionsPerHost { get; set; } + public int MaxConnectionsPerHost { get; set; } + public int MaxRequestsPerConnection { get; set; } + } + + public class Socket + { + public int ConnectTimeoutMillis { get; set; } + public int ReadTimeoutMillis { get; set; } + } + + public class Query + { + public string ConsistencyLevel { get; set; } + public string SerialConsistencyLevel { get; set; } + public bool DefaultIdempotence { get; set; } + } + + public class ReplicationStrategy + { + public string Class { get; set; } + public DataCenter[] DataCenters { get; set; } + } + + public class DataCenter + { + public string Name { get; set; } + public int ReplicationFactor { get; set; } + public string Strategy { get; set; } + } + + public class Node + { + public string Host { get; set; } + public int Port { get; set; } + public string DataCenter { get; set; } + public string Rack { get; set; } + } + +} diff --git a/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs new file mode 100644 index 0000000..a5cd9c8 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraProvider.cs @@ -0,0 +1,147 @@ +using System; +using System.Linq; +using System.Reflection; +using System.Text; +using Cassandra; +using Cassandra.Mapping; +using Cassandra.Data.Linq; +using System.ComponentModel.DataAnnotations; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; +using JiShe.CollectBus.Common.Attributes; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency + { + private readonly ILogger _logger; + + public Cluster Instance { get; set; } + + public ISession Session { get; set; } + + public CassandraConfig CassandraConfig { get; set; } + /// + /// + /// + /// + /// + public CassandraProvider( + IOptions options, + ILogger logger) + { + CassandraConfig = options.Value; + _logger = logger; + } + + public void InitClusterAndSession() + { + GetCluster((keyspace) => + { + GetSession(keyspace); + }); + } + + public Cluster GetCluster(Action? callback=null) + { + var clusterBuilder = Cluster.Builder(); + + // 添加多个节点 + foreach (var node in CassandraConfig.Nodes) + { + clusterBuilder.AddContactPoint(node.Host) + .WithPort(node.Port); + } + + clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password); + + // 优化连接池配置 + var poolingOptions = new PoolingOptions() + .SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost) + .SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost) + .SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection) + .SetHeartBeatInterval(30000); // 30秒心跳 + + clusterBuilder.WithPoolingOptions(poolingOptions); + + // 优化Socket配置 + var socketOptions = new SocketOptions() + .SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis) + .SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis) + .SetTcpNoDelay(true) // 启用Nagle算法 + .SetKeepAlive(true) // 启用TCP保活 + .SetReceiveBufferSize(32768) // 32KB接收缓冲区 + .SetSendBufferSize(32768); // 32KB发送缓冲区 + + clusterBuilder.WithSocketOptions(socketOptions); + + // 优化查询选项 + var queryOptions = new QueryOptions() + .SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel)) + .SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel)) + .SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence) + .SetPageSize(5000); // 增加页面大小 + + clusterBuilder.WithQueryOptions(queryOptions); + + // 启用压缩 + clusterBuilder.WithCompression(CompressionType.LZ4); + + // 配置重连策略 + clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000)); + Instance = clusterBuilder.Build(); + callback?.Invoke(null); + return Instance; + } + + public ISession GetSession(string? keyspace = null) + { + if (string.IsNullOrEmpty(keyspace)) + { + keyspace = CassandraConfig.Keyspace; + } + Session = Instance.Connect(); + var replication = GetReplicationStrategy(); + Session.CreateKeyspaceIfNotExists(keyspace, replication); + Session.ChangeKeyspace(keyspace); + return Session; + } + + private Dictionary GetReplicationStrategy() + { + var strategy = CassandraConfig.ReplicationStrategy.Class; + var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters; + + switch (strategy) + { + case "NetworkTopologyStrategy": + var networkDic = new Dictionary { { "class", "NetworkTopologyStrategy" } }; + foreach (var dataCenter in dataCenters) + { + networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString()); + } + return networkDic; + case "SimpleStrategy": + var dic = new Dictionary { { "class", "SimpleStrategy" } }; + if (dataCenters.Length >= 1) + { + dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString()); + } + else + { + _logger.LogError("SimpleStrategy 不支持多个数据中心!"); + } + return dic; + default: + throw new ArgumentNullException($"Strategy", "Strategy配置错误!"); + } + } + + public void Dispose() + { + Instance.Dispose(); + Session.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs b/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs new file mode 100644 index 0000000..0ea1b56 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs @@ -0,0 +1,156 @@ +using System.Collections.Concurrent; +using Cassandra; +using Cassandra.Mapping; +using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.Logging; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraQueryOptimizer + { + private readonly ISession _session; + private readonly ILogger _logger; + private readonly IMemoryCache _cache; + private readonly ConcurrentDictionary _preparedStatements; + private readonly int _batchSize; + private readonly TimeSpan _cacheExpiration; + + public CassandraQueryOptimizer( + ISession session, + ILogger logger, + IMemoryCache cache, + int batchSize = 100, + TimeSpan? cacheExpiration = null) + { + _session = session; + _logger = logger; + _cache = cache; + _preparedStatements = new ConcurrentDictionary(); + _batchSize = batchSize; + _cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5); + } + + public async Task GetOrPrepareStatementAsync(string cql) + { + return _preparedStatements.GetOrAdd(cql, key => + { + try + { + var statement = _session.Prepare(key); + _logger.LogDebug($"Prepared statement for CQL: {key}"); + return statement; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Failed to prepare statement for CQL: {key}"); + throw; + } + }); + } + + public async Task ExecuteBatchAsync(IEnumerable statements) + { + var batch = new BatchStatement(); + var count = 0; + + foreach (var statement in statements) + { + batch.Add(statement); + count++; + + if (count >= _batchSize) + { + await ExecuteBatchAsync(batch); + batch = new BatchStatement(); + count = 0; + } + } + + if (count > 0) + { + await ExecuteBatchAsync(batch); + } + } + + private async Task ExecuteBatchAsync(BatchStatement batch) + { + try + { + await _session.ExecuteAsync(batch); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to execute batch statement"); + throw; + } + } + + public async Task GetOrSetFromCacheAsync(string cacheKey, Func> getData) + { + if (_cache.TryGetValue(cacheKey, out T cachedValue)) + { + _logger.LogDebug($"Cache hit for key: {cacheKey}"); + return cachedValue; + } + + var data = await getData(); + _cache.Set(cacheKey, data, _cacheExpiration); + _logger.LogDebug($"Cache miss for key: {cacheKey}, data cached"); + return data; + } + + public async Task> ExecutePagedQueryAsync( + string cql, + object[] parameters, + int pageSize = 100, + string pagingState = null) where T : class + { + var statement = await GetOrPrepareStatementAsync(cql); + var boundStatement = statement.Bind(parameters); + + if (!string.IsNullOrEmpty(pagingState)) + { + boundStatement.SetPagingState(Convert.FromBase64String(pagingState)); + } + + boundStatement.SetPageSize(pageSize); + + try + { + var result = await _session.ExecuteAsync(boundStatement); + //TODO: RETURN OBJECT + throw new NotImplementedException(); + //result.GetRows() + //return result.Select(row => row); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Failed to execute paged query: {cql}"); + throw; + } + } + + public async Task BulkInsertAsync(IEnumerable items, string tableName) + { + var mapper = new Mapper(_session); + var batch = new List(); + var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})"; + + foreach (var chunk in items.Chunk(_batchSize)) + { + var statements = chunk.Select(item => + { + var props = typeof(T).GetProperties(); + var columns = string.Join(", ", props.Select(p => p.Name)); + var values = string.Join(", ", props.Select(p => "?")); + var statement = _session.Prepare(string.Format(cql, columns, values)); + return statement.Bind(props.Select(p => p.GetValue(item)).ToArray()); + }); + + batch.AddRange(statements); + } + + await ExecuteBatchAsync(batch); + } + } +} \ No newline at end of file diff --git a/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs new file mode 100644 index 0000000..8381450 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CassandraRepository.cs @@ -0,0 +1,69 @@ +using Cassandra; +using Cassandra.Mapping; +using JiShe.CollectBus.Cassandra.Extensions; +using Volo.Abp.Domain.Entities; +using Volo.Abp.Domain.Repositories; + +namespace JiShe.CollectBus.Cassandra +{ + public class CassandraRepository + : ICassandraRepository + where TEntity : class + { + + public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig) + { + Mapper = new Mapper(cassandraProvider.Session, mappingConfig); + cassandraProvider.Session.CreateTable(cassandraProvider.CassandraConfig.Keyspace); + } + + public readonly IMapper Mapper; + + public virtual async Task GetAsync(TKey id) + { + return await Mapper.SingleOrDefaultAsync("WHERE id = ?", id); + } + + public virtual async Task> GetListAsync() + { + return (await Mapper.FetchAsync()).ToList(); + } + + public virtual async Task InsertAsync(TEntity entity) + { + await Mapper.InsertAsync(entity); + return entity; + } + + public virtual async Task UpdateAsync(TEntity entity) + { + await Mapper.UpdateAsync(entity); + return entity; + } + + public virtual async Task DeleteAsync(TEntity entity) + { + await Mapper.DeleteAsync(entity); + } + + public virtual async Task DeleteAsync(TKey id) + { + await Mapper.DeleteAsync("WHERE id = ?", id); + } + + public virtual async Task> GetPagedListAsync( + int skipCount, + int maxResultCount, + string sorting) + { + var cql = $"SELECT * FROM {typeof(TEntity).Name.ToLower()}"; + if (!string.IsNullOrWhiteSpace(sorting)) + { + cql += $" ORDER BY {sorting}"; + } + cql += $" LIMIT {maxResultCount} OFFSET {skipCount}"; + + return (await Mapper.FetchAsync(cql)).ToList(); + } + } +} \ No newline at end of file diff --git a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs new file mode 100644 index 0000000..2502420 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs @@ -0,0 +1,29 @@ +using Cassandra; +using Cassandra.Mapping; +using JiShe.CollectBus.Cassandra.Mappers; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp; +using Volo.Abp.Autofac; +using Volo.Abp.Modularity; + +namespace JiShe.CollectBus.Cassandra +{ + [DependsOn( + typeof(AbpAutofacModule) + )] + public class CollectBusCassandraModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + Configure(context.Services.GetConfiguration().GetSection("Cassandra")); + + context.AddCassandra(); + + } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + context.UseCassandra(); + } + } +} diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..9c6a044 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs @@ -0,0 +1,35 @@ +using Autofac.Core; +using Cassandra; +using Cassandra.Mapping; +using JiShe.CollectBus.Cassandra; +using JiShe.CollectBus.Cassandra.Mappers; +using Microsoft.Extensions.Options; +using System.Reflection; +using Volo.Abp; +using Volo.Abp.Modularity; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class ApplicationInitializationContextExtensions + { + public static void UseCassandra(this ApplicationInitializationContext context) + { + var service = context.ServiceProvider; + var cassandraProvider = service.GetRequiredService(); + cassandraProvider.InitClusterAndSession(); + } + } + + public static class ServiceCollectionExtensions + { + public static void AddCassandra(this ServiceConfigurationContext context) + { + context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); + + var mappingConfig = new MappingConfiguration() + .Define(new CollectBusMapping()); + context.Services.AddSingleton(mappingConfig); + } + } +} diff --git a/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs new file mode 100644 index 0000000..f515416 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs @@ -0,0 +1,83 @@ +using System.Reflection; +using System.Text; +using Cassandra; +using System.ComponentModel.DataAnnotations; +using JiShe.CollectBus.Common.Attributes; +using Cassandra.Mapping; + +namespace JiShe.CollectBus.Cassandra.Extensions +{ + public static class SessionExtension + { + public static void CreateTable(this ISession session,string? defaultKeyspace=null) where TEntity : class + { + var type = typeof(TEntity); + var tableAttribute = type.GetCustomAttribute(); + var tableName = tableAttribute?.Name ?? type.Name.ToLower(); + var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace; + + var properties = type.GetProperties(); + var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null); + + if (primaryKey == null) + { + throw new InvalidOperationException($"No primary key defined for type {type.Name}"); + } + + var cql = new StringBuilder(); + cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} ("); + + foreach (var prop in properties) + { + var ignoreAttribute = prop.GetCustomAttribute(); + if (ignoreAttribute != null) continue; + var columnName = prop.Name.ToLower(); + var cqlType = GetCassandraType(prop.PropertyType); + + cql.Append($"{columnName} {cqlType}, "); + } + cql.Length -= 2; // Remove last comma and space + cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))"); + + session.Execute(cql.ToString()); + } + + private static string GetCassandraType(Type type) + { + // 基础类型处理 + switch (Type.GetTypeCode(type)) + { + case TypeCode.String: return "text"; + case TypeCode.Int32: return "int"; + case TypeCode.Int64: return "bigint"; + case TypeCode.Boolean: return "boolean"; + case TypeCode.DateTime: return "timestamp"; + case TypeCode.Byte: return "tinyint"; + } + + if (type == typeof(Guid)) return "uuid"; + if (type == typeof(DateTimeOffset)) return "timestamp"; + if (type == typeof(Byte[])) return "blob"; + + // 处理集合类型 + if (type.IsGenericType) + { + var genericType = type.GetGenericTypeDefinition(); + var elementType = type.GetGenericArguments()[0]; + + if (genericType == typeof(List<>)) + return $"list<{GetCassandraType(elementType)}>"; + if (genericType == typeof(HashSet<>)) + return $"set<{GetCassandraType(elementType)}>"; + if (genericType == typeof(Dictionary<,>)) + { + var keyType = type.GetGenericArguments()[0]; + var valueType = type.GetGenericArguments()[1]; + return $"map<{GetCassandraType(keyType)}, {GetCassandraType(valueType)}>"; + } + } + + throw new NotSupportedException($"不支持的类型: {type.Name}"); + } + } +} diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs new file mode 100644 index 0000000..45c6ca6 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/ICassandraProvider.cs @@ -0,0 +1,24 @@ +using Cassandra; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Cassandra +{ + public interface ICassandraProvider + { + Cluster Instance { get;} + + ISession Session { get;} + + CassandraConfig CassandraConfig { get; } + + ISession GetSession(string? keyspace = null); + + Cluster GetCluster(Action? callback = null); + + void InitClusterAndSession(); + } +} diff --git a/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs b/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs new file mode 100644 index 0000000..6dd474f --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/ICassandraRepository.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; + +namespace JiShe.CollectBus.Cassandra +{ + public interface ICassandraRepository where TEntity : class + { + Task GetAsync(TKey id); + Task> GetListAsync(); + Task InsertAsync(TEntity entity); + Task UpdateAsync(TEntity entity); + Task DeleteAsync(TEntity entity); + Task DeleteAsync(TKey id); + Task> GetPagedListAsync(int skipCount, int maxResultCount, string sorting); + } +} diff --git a/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj new file mode 100644 index 0000000..db7ccf3 --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj @@ -0,0 +1,22 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + + + + + diff --git a/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs b/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs new file mode 100644 index 0000000..d07f8ea --- /dev/null +++ b/src/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs @@ -0,0 +1,20 @@ +using Cassandra.Mapping; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.IotSystems.MessageIssueds; +using static Cassandra.QueryTrace; + +namespace JiShe.CollectBus.Cassandra.Mappers +{ + public class CollectBusMapping: Mappings + { + public CollectBusMapping() + { + For() + .Column(e => e.Type, cm => cm.WithName("type").WithDbType()); + } + } +} diff --git a/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs new file mode 100644 index 0000000..bdd16d5 --- /dev/null +++ b/src/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.EventBus; +using Volo.Abp; + +namespace JiShe.CollectBus.Common.Attributes +{ + [AttributeUsage(AttributeTargets.Class, Inherited = false)] + public class CassandraTableAttribute : Attribute + { + public CassandraTableAttribute(string? name = null,string? keyspace =null) + { + Name = name; + Keyspace = keyspace; + } + + public virtual string? Name { get; } + + public virtual string? Keyspace { get; } + } + + [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] + public class CassandraIgnoreAttribute : Attribute + { + public CassandraIgnoreAttribute() + { + } + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs index 42d740c..5977d5e 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs @@ -1,14 +1,18 @@ using System; using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; using System.Linq; using System.Text; using System.Threading.Tasks; +using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Enums; namespace JiShe.CollectBus.IotSystems.MessageIssueds { + [CassandraTable] public class MessageIssued { + [Key] public string ClientId { get; set; } public byte[] Message { get; set; } public string DeviceNo { get; set; } diff --git a/src/JiShe.CollectBus.Host/Program.cs b/src/JiShe.CollectBus.Host/Program.cs index 40a13cc..d75a227 100644 --- a/src/JiShe.CollectBus.Host/Program.cs +++ b/src/JiShe.CollectBus.Host/Program.cs @@ -1,12 +1,35 @@ using JiShe.CollectBus.Host; using Microsoft.AspNetCore.Hosting; using Serilog; +using Volo.Abp.Modularity.PlugIns; public class Program { - public static void Main(string[] args) + /// + /// + /// + /// + /// + public static async Task Main(string[] args) { - CreateHostBuilder(args).Build().Run(); + var builder = WebApplication.CreateBuilder(args); + builder.Host.UseContentRoot(Directory.GetCurrentDirectory()) + .UseSerilog((context, loggerConfiguration) => + { + loggerConfiguration.ReadFrom.Configuration(context.Configuration); + }) + .UseAutofac(); + await builder.AddApplicationAsync(options => + { + // ز̶ģʽȲ + options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins")); + }); + var app = builder.Build(); + await app.InitializeApplicationAsync(); + await app.RunAsync(); + + + //await CreateHostBuilder(args).Build().RunAsync(); } private static IHostBuilder CreateHostBuilder(string[] args) => @@ -16,13 +39,14 @@ public class Program { loggerConfiguration.ReadFrom.Configuration(context.Configuration); }) - .UseAutofac() .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup(); - }); - - + }) + .UseAutofac(); + + + private static IHostBuilder CreateConsoleHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) @@ -34,8 +58,8 @@ public class Program }); - private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) + private static async Task ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) { - services.AddApplication(); + await services.AddApplicationAsync(); } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.Host/Startup.cs b/src/JiShe.CollectBus.Host/Startup.cs index dd6453e..32740ee 100644 --- a/src/JiShe.CollectBus.Host/Startup.cs +++ b/src/JiShe.CollectBus.Host/Startup.cs @@ -39,10 +39,15 @@ namespace JiShe.CollectBus.Host /// The lifetime. public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime) { - app.InitializeApplication(); - //await app.InitializeApplicationAsync(); + app.Use(async (context, next) => + { + // 在请求处理之前调用 InitializeApplicationAsync + await app.InitializeApplicationAsync(); + // 继续请求管道中的下一个中间件 + await next(); + }); } } } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index a5b2e15..a837636 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -130,5 +130,48 @@ }, "ServerTagName": "JiSheCollectBus", "KafkaReplicationFactor": 3, - "NumPartitions": 30 + "NumPartitions": 30, + "Cassandra": { + "ReplicationStrategy": { + "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下 + "DataCenters": [ + { + "Name": "dc1", + "ReplicationFactor": 3 + } + ] + }, + "Nodes": [ + { + "Host": "192.168.1.9", + "Port": 9042, + "DataCenter": "dc1", + "Rack": "RAC1" + }, + { + "Host": "192.168.1.9", + "Port": 9043, + "DataCenter": "dc1", + "Rack": "RAC2" + } + ], + "Username": "admin", + "Password": "lixiao1980", + "Keyspace": "jishecollectbus", + "ConsistencyLevel": "Quorum", + "PoolingOptions": { + "CoreConnectionsPerHost": 4, + "MaxConnectionsPerHost": 8, + "MaxRequestsPerConnection": 2000 + }, + "SocketOptions": { + "ConnectTimeoutMillis": 10000, + "ReadTimeoutMillis": 20000 + }, + "QueryOptions": { + "ConsistencyLevel": "Quorum", + "SerialConsistencyLevel": "Serial", + "DefaultIdempotence": true + } + } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 64661d1..5cda0c8 100644 --- a/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/src/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -12,10 +12,10 @@ namespace JiShe.CollectBus.Protocol context.Services.AddKeyedSingleton(nameof(StandardProtocolPlugin)); } - public override void OnApplicationInitialization(ApplicationInitializationContext context) + public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin)); - standardProtocol.AddAsync(); + await standardProtocol.AddAsync(); } } }