diff --git a/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj b/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj index 4866985..0f8d650 100644 --- a/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj +++ b/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj @@ -9,6 +9,8 @@ + + diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs index ed82fa0..0c74082 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs @@ -27,7 +27,6 @@ namespace JiShe.CollectBus.Protocol.T37612012 private readonly IProducerService _producerService; - private readonly IRepository _deviceRepository; private readonly ITcpService _tcpService; public readonly Dictionary T3761AFNHandlers; @@ -41,7 +40,6 @@ namespace JiShe.CollectBus.Protocol.T37612012 _logger = logger; //_logger = serviceProvider.GetRequiredService>(); _producerService = serviceProvider.GetRequiredService(); - _deviceRepository = serviceProvider.GetRequiredService>(); _tcpService = tcpService; T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers; } @@ -133,7 +131,6 @@ namespace JiShe.CollectBus.Protocol.T37612012 else { _logger.LogError($"不支持的上报kafka主题:{topicName}"); - await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis); } } diff --git a/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs index c6779e5..f6a689e 100644 --- a/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs @@ -20,13 +20,11 @@ namespace JiShe.CollectBus.Protocol.Abstracts public const string errorData = "EE"; private readonly ILogger _logger; - private readonly IRepository _protocolInfoRepository; private readonly IFreeRedisProvider _redisProvider; public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger) { _logger = logger; - _protocolInfoRepository = serviceProvider.GetRequiredService>(); _redisProvider = serviceProvider.GetRequiredService(); } @@ -40,10 +38,8 @@ namespace JiShe.CollectBus.Protocol.Abstracts if (Info == null) { throw new ArgumentNullException(nameof(Info)); - } + } - await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name); - await _protocolInfoRepository.InsertAsync(Info); await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name); await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info); } diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs index 63594c8..8ce2662 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs @@ -12,7 +12,6 @@ namespace JiShe.CollectBus.Subscribers { Task LoginIssuedEvent(List issuedEventMessage); Task HeartbeatIssuedEvent(List issuedEventMessage); - Task ReceivedEvent(MessageProtocolAnalysis receivedMessage); Task ReceivedHeartbeatEvent(List receivedHeartbeatMessage); Task ReceivedLoginEvent(List receivedLoginMessage); } diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 942befc..8fa7033 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -1,13 +1,10 @@ -using Cassandra.Mapping; -using JiShe.CollectBus.Cassandra; -using JiShe.CollectBus.DataChannels; +using JiShe.CollectBus.DataChannels; using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IoTDB; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka; -using JiShe.CollectBus.Mappers; using JiShe.CollectBus.Protocol; using JiShe.CollectBus.ScheduledMeterReading; using Microsoft.Extensions.DependencyInjection; @@ -19,12 +16,8 @@ using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp; using Volo.Abp.Application; -using Volo.Abp.AuditLogging; using Volo.Abp.Autofac; using Volo.Abp.AutoMapper; -using Volo.Abp.BackgroundJobs; -using Volo.Abp.BackgroundWorkers; -using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.Modularity; namespace JiShe.CollectBus; @@ -34,16 +27,12 @@ namespace JiShe.CollectBus; typeof(CollectBusApplicationContractsModule), typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), - typeof(AbpAutofacModule), - typeof(AbpBackgroundWorkersHangfireModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), typeof(CollectBusIoTDbModule), + typeof(AbpAutofacModule), typeof(CollectBusDomainSharedModule), - typeof(AbpAuditLoggingDomainModule), - typeof(AbpBackgroundJobsDomainModule), - typeof(CollectBusCassandraModule), typeof(CollectBusProtocolModule) )] public class CollectBusApplicationModule : AbpModule @@ -55,8 +44,8 @@ public class CollectBusApplicationModule : AbpModule context.Services.AddAutoMapperObjectMapper(); Configure(options => { options.AddMaps(true); }); - context.Services.AddSingleton(new MappingConfiguration() - .Define(new CollectBusMapping())); + //context.Services.AddSingleton(new MappingConfiguration() + // .Define(new CollectBusMapping())); // 注册拦截器 context.Services.OnRegistered(ctx => @@ -70,33 +59,17 @@ public class CollectBusApplicationModule : AbpModule public override async Task OnApplicationInitializationAsync( ApplicationInitializationContext context) { - var assembly = Assembly.GetExecutingAssembly(); - var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface) - .ToList(); - foreach (var type in types) await context.AddBackgroundWorkerAsync(type); - - //Task.Run(() => + //var assembly = Assembly.GetExecutingAssembly(); + //var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface) + // .ToList(); + //foreach (var type in types) //{ - // //默认初始化表计信息 - // var dbContext = context.ServiceProvider.GetRequiredService(); - // dbContext.InitAmmeterCacheData(); - // //await dbContext.InitWatermeterCacheData(); - //}).ConfigureAwait(false); - + // await context.AddBackgroundWorkerAsync(type); + //} + //下发任务通道构建 DataChannelManage.TaskDataChannel = Channel.CreateUnbounded>>(); - - - // 日志存储通道构建 - DataChannelManage.LogSaveChannel = Channel.CreateUnbounded(); - - // 日志刷新通道构建 - DataChannelManage.LogRefreshChannel = Channel.CreateUnbounded(); - - // 启动通道任务 - var _dataChannelManage = context.ServiceProvider.GetRequiredService(); - _ = _dataChannelManage.LogSaveAsync(DataChannelManage.LogSaveChannel.Reader); - + //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); await dbContext.InitAmmeterCacheData("V4-Gather-8890"); diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index 58b5718..604a294 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -1,6 +1,4 @@ -using Amazon.Runtime.Internal.Transform; -using DnsClient.Protocol; -using JiShe.CollectBus.Common; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; @@ -10,7 +8,6 @@ using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Dto; using JiShe.CollectBus.Protocol.Models; -using JiShe.CollectBus.Repository.LogRecord; using Mapster; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -35,23 +32,20 @@ namespace JiShe.CollectBus.DataChannels private readonly IIoTDbProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; - private readonly ServerApplicationOptions _applicationOptions; - private readonly ILogRecordRepository _logRecordRepository; + private readonly ServerApplicationOptions _applicationOptions; public DataChannelManageService( ILogger logger, IIoTDbProvider dbProvider, IProducerService producerService, IOptions kafkaOptions, - IOptions applicationOptions, - ILogRecordRepository logRecordRepository) + IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; - _applicationOptions = applicationOptions.Value; - _logRecordRepository= logRecordRepository; + _applicationOptions = applicationOptions.Value; } /// @@ -207,126 +201,126 @@ namespace JiShe.CollectBus.DataChannels - /// - /// 日志保存 - /// - /// - /// - public async Task LogSaveAsync(ChannelReader channelReader) - { - const int BatchSize = 1000; - const int EmptyWaitMilliseconds = 1000; - var timeout = TimeSpan.FromSeconds(2); - var timer = Stopwatch.StartNew(); - long timeoutMilliseconds = 0; - try - { - while (true) - { - var batch = new List(); - var canRead = channelReader.Count; - if (canRead <= 0) - { - if (timeoutMilliseconds > 0) - { - _logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); - } - timeoutMilliseconds = 0; - //无消息时短等待1秒 - await Task.Delay(EmptyWaitMilliseconds); - continue; - } + ///// + ///// 日志保存 + ///// + ///// + ///// + //public async Task LogSaveAsync(ChannelReader channelReader) + //{ + // const int BatchSize = 1000; + // const int EmptyWaitMilliseconds = 1000; + // var timeout = TimeSpan.FromSeconds(2); + // var timer = Stopwatch.StartNew(); + // long timeoutMilliseconds = 0; + // try + // { + // while (true) + // { + // var batch = new List(); + // var canRead = channelReader.Count; + // if (canRead <= 0) + // { + // if (timeoutMilliseconds > 0) + // { + // _logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); + // } + // timeoutMilliseconds = 0; + // //无消息时短等待1秒 + // await Task.Delay(EmptyWaitMilliseconds); + // continue; + // } - timer.Restart(); - var startTime = DateTime.Now; + // timer.Restart(); + // var startTime = DateTime.Now; - try - { - // 异步批量读取数据 - while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) - { - try - { - if (channelReader.TryRead(out var dataItem)) - { - batch.Add(dataItem); - } - } - catch (Exception) - { - throw; - } - } - } - catch (Exception) - { - throw; - } + // try + // { + // // 异步批量读取数据 + // while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) + // { + // try + // { + // if (channelReader.TryRead(out var dataItem)) + // { + // batch.Add(dataItem); + // } + // } + // catch (Exception) + // { + // throw; + // } + // } + // } + // catch (Exception) + // { + // throw; + // } - if (batch == null || batch.Count == 0) - { - await Task.Delay(EmptyWaitMilliseconds); - continue; - } - try - { + // if (batch == null || batch.Count == 0) + // { + // await Task.Delay(EmptyWaitMilliseconds); + // continue; + // } + // try + // { - // 按小时分组 - var hourGroups = new Dictionary>(); - DateTime? dateTime = null; - List batchList = new List(); - int index = 1; - foreach (var item in batch) - { - var records = item.Adapt(); + // // 按小时分组 + // var hourGroups = new Dictionary>(); + // DateTime? dateTime = null; + // List batchList = new List(); + // int index = 1; + // foreach (var item in batch) + // { + // var records = item.Adapt(); - if (!records.ReceivedTime.HasValue) - records.ReceivedTime = DateTime.Now; - var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0); - if (!dateTime.HasValue || curDateTime != dateTime) - { - dateTime = curDateTime; - if (batchList.Count > 0) - { - var immutableList = ImmutableList.CreateRange(batchList); - hourGroups.Add(dateTime.Value, immutableList.ToList()); - batchList.Clear(); - } - } - batchList.Add(records); - // 最后一批 - if(index== batch.Count) - { - var immutableList = ImmutableList.CreateRange(batchList); - hourGroups.Add(dateTime.Value, immutableList.ToList()); - batchList.Clear(); - } - index++; - } - foreach (var (time, records) in hourGroups) - { - // 批量写入数据库 - await _logRecordRepository.InsertManyAsync(records, time); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "数据通道处理日志数据时发生异常"); - } - batch.Clear(); - timer.Stop(); + // if (!records.ReceivedTime.HasValue) + // records.ReceivedTime = DateTime.Now; + // var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0); + // if (!dateTime.HasValue || curDateTime != dateTime) + // { + // dateTime = curDateTime; + // if (batchList.Count > 0) + // { + // var immutableList = ImmutableList.CreateRange(batchList); + // hourGroups.Add(dateTime.Value, immutableList.ToList()); + // batchList.Clear(); + // } + // } + // batchList.Add(records); + // // 最后一批 + // if(index== batch.Count) + // { + // var immutableList = ImmutableList.CreateRange(batchList); + // hourGroups.Add(dateTime.Value, immutableList.ToList()); + // batchList.Clear(); + // } + // index++; + // } + // foreach (var (time, records) in hourGroups) + // { + // // 批量写入数据库 + // await _logRecordRepository.InsertManyAsync(records, time); + // } + // } + // catch (Exception ex) + // { + // _logger.LogError(ex, "数据通道处理日志数据时发生异常"); + // } + // batch.Clear(); + // timer.Stop(); - timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; + // timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; - startTime = DateTime.Now; - } - } - catch (Exception ex) - { - _logger.LogCritical(ex, "日志处理发生致命错误"); - throw; - } - } + // startTime = DateTime.Now; + // } + // } + // catch (Exception ex) + // { + // _logger.LogCritical(ex, "日志处理发生致命错误"); + // throw; + // } + //} } } diff --git a/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index 01878c2..5dae431 100644 --- a/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -17,20 +17,22 @@ - - - + + + - - - + + + + + diff --git a/services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs b/services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs deleted file mode 100644 index a4d4ef4..0000000 --- a/services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs +++ /dev/null @@ -1,17 +0,0 @@ -using Cassandra.Mapping; -using JiShe.CollectBus.IotSystems.Devices; -using JiShe.CollectBus.IotSystems.MessageIssueds; - -namespace JiShe.CollectBus.Mappers -{ - public class CollectBusMapping: Mappings - { - public CollectBusMapping() - { - For() - .Column(e => e.Type, cm => cm.WithName("type").WithDbType()); - For() - .Column(e => e.Status, cm => cm.WithName("status").WithDbType()); - } - } -} diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs index 1975d5a..8c7b5fe 100644 --- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -1,6 +1,4 @@ -using Cassandra; -using JiShe.CollectBus.Cassandra; -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.DynamicModule; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IotSystems.MessageIssueds; @@ -22,8 +20,8 @@ namespace JiShe.CollectBus.Samples; public class TestAppService : CollectBusAppService { private readonly ILogger _logger; - private readonly ICassandraRepository _messageReceivedCassandraRepository; - private readonly ICassandraProvider _cassandraProvider; + //private readonly ICassandraRepository _messageReceivedCassandraRepository; + //private readonly ICassandraProvider _cassandraProvider; private readonly IProtocolService _protocolService; private readonly IServiceProvider _serviceProvider; private readonly IDynamicModuleManager _dynamicModuleManager; @@ -31,93 +29,94 @@ public class TestAppService : CollectBusAppService public TestAppService( ILogger logger, - ICassandraRepository messageReceivedCassandraRepository, - ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager) + //ICassandraRepository messageReceivedCassandraRepository, + //ICassandraProvider cassandraProvider, + IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager) { _logger = logger; - _messageReceivedCassandraRepository = messageReceivedCassandraRepository; - _cassandraProvider = cassandraProvider; + //_messageReceivedCassandraRepository = messageReceivedCassandraRepository; + //_cassandraProvider = cassandraProvider; _protocolService = protocolService; _serviceProvider = serviceProvider; _dynamicModuleManager = dynamicModuleManager; } public async Task AddMessageOfCassandra() { - var stopwatch = Stopwatch.StartNew(); - for (int i = 1; i <= 10000; i++) - { - var str = Guid.NewGuid().ToString(); - await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued - { - ClientId = str, - DeviceNo = i.ToString(), - MessageId = str, - Type = IssuedEventType.Data, - Id = str, - Message = str.GetBytes() - }); - } - stopwatch.Stop(); - _logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); + //var stopwatch = Stopwatch.StartNew(); + //for (int i = 1; i <= 10000; i++) + //{ + // var str = Guid.NewGuid().ToString(); + // await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued + // { + // ClientId = str, + // DeviceNo = i.ToString(), + // MessageId = str, + // Type = IssuedEventType.Data, + // Id = str, + // Message = str.GetBytes() + // }); + //} + //stopwatch.Stop(); + //_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } public async Task AddMessageOfBulkInsertCassandra() { - var records = new List(); - var prepared = await _cassandraProvider.Session.PrepareAsync( - $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)"); + //var records = new List(); + //var prepared = await _cassandraProvider.Session.PrepareAsync( + // $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)"); - for (int i = 1; i <= 100000; i++) - { - var str = Guid.NewGuid().ToString(); - records.Add(new MessageIssued - { - ClientId = str, - DeviceNo = i.ToString(), - MessageId = str, - Type = IssuedEventType.Data, - Id = str, - Message = str.GetBytes() - }); - } - var stopwatch = Stopwatch.StartNew(); - await BulkInsertAsync(_cassandraProvider.Session, prepared, records); - stopwatch.Stop(); - _logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); + //for (int i = 1; i <= 100000; i++) + //{ + // var str = Guid.NewGuid().ToString(); + // records.Add(new MessageIssued + // { + // ClientId = str, + // DeviceNo = i.ToString(), + // MessageId = str, + // Type = IssuedEventType.Data, + // Id = str, + // Message = str.GetBytes() + // }); + //} + //var stopwatch = Stopwatch.StartNew(); + //await BulkInsertAsync(_cassandraProvider.Session, prepared, records); + //stopwatch.Stop(); + //_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); } - private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List records) - { - var tasks = new List(); - var batch = new BatchStatement(); + //private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List records) + //{ + // var tasks = new List(); + // var batch = new BatchStatement(); - for (int i = 0; i < records.Count; i++) - { - var record = records[i]; - var boundStatement = prepared.Bind( - record.Id, - record.ClientId, - record.Message, - record.DeviceNo, - (int)record.Type, - record.MessageId); + // for (int i = 0; i < records.Count; i++) + // { + // var record = records[i]; + // var boundStatement = prepared.Bind( + // record.Id, + // record.ClientId, + // record.Message, + // record.DeviceNo, + // (int)record.Type, + // record.MessageId); - // 设置一致性级别为ONE以提高性能 - boundStatement.SetConsistencyLevel(ConsistencyLevel.One); + // // 设置一致性级别为ONE以提高性能 + // boundStatement.SetConsistencyLevel(ConsistencyLevel.One); - batch.Add(boundStatement); + // batch.Add(boundStatement); - // 当达到批处理大小时执行 - if (batch.Statements.Count() >= 1000 || i == records.Count - 1) - { - tasks.Add(session.ExecuteAsync(batch)); - batch = new BatchStatement(); - } - } + // // 当达到批处理大小时执行 + // if (batch.Statements.Count() >= 1000 || i == records.Count - 1) + // { + // tasks.Add(session.ExecuteAsync(batch)); + // batch = new BatchStatement(); + // } + // } - // 等待所有批处理完成 - await Task.WhenAll(tasks); - } + // // 等待所有批处理完成 + // await Task.WhenAll(tasks); + //} [LogIntercept] public virtual Task LogInterceptorTest(string str) diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 85a71d8..e7849d8 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -332,9 +332,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading try { // 创建取消令牌源 - //var cts = new CancellationTokenSource(); - - await _dbProvider.GetSessionPool(true).InitTableSessionModelAsync(); + //var cts = new CancellationTokenSource(); _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader); diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs index 7b3517b..25b5242 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs @@ -7,7 +7,6 @@ using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol3761; -using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; @@ -24,7 +23,6 @@ namespace JiShe.CollectBus.Subscribers private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; - private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IIoTDbProvider _dbProvider; private readonly IProtocolService _protocolService; @@ -32,12 +30,11 @@ namespace JiShe.CollectBus.Subscribers ITcpService tcpService, IServiceProvider serviceProvider, IIoTDbProvider dbProvider, - IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService) + IProtocolService protocolService) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; - _meterReadingRecordsRepository = meterReadingRecordsRepository; _dbProvider = dbProvider; _protocolService = protocolService; } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs deleted file mode 100644 index 670fd7c..0000000 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ /dev/null @@ -1,172 +0,0 @@ -using JiShe.CollectBus.Common.Consts; -using JiShe.CollectBus.Common.Enums; -using JiShe.CollectBus.Common.Models; -using JiShe.CollectBus.Interceptors; -using JiShe.CollectBus.IoTDB.Interface; -using JiShe.CollectBus.IotSystems.MessageReceiveds; -using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.Kafka.Attributes; -using JiShe.CollectBus.Kafka.Internal; -using JiShe.CollectBus.Protocol.Interfaces; -using JiShe.CollectBus.Protocol3761; -using JiShe.CollectBus.Repository.MeterReadingRecord; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using TouchSocket.Sockets; -using Volo.Abp.Domain.Repositories; - -namespace JiShe.CollectBus.Subscribers -{ - public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe - { - private readonly ILogger _logger; - private readonly ITcpService _tcpService; - private readonly IServiceProvider _serviceProvider; - private readonly IRepository _messageReceivedLoginEventRepository; - private readonly IRepository _messageReceivedHeartbeatEventRepository; - private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; - private readonly IIoTDbProvider _dbProvider; - private readonly IProtocolService _protocolService; - - /// - /// Initializes a new instance of the class. - /// - /// The logger. - /// The TCP service. - /// The service provider. - /// The message received login event repository. - /// The message received heartbeat event repository. - /// The device repository. - public SubscriberAppService(ILogger logger, - ITcpService tcpService, - IServiceProvider serviceProvider, - IRepository messageReceivedLoginEventRepository, - IRepository messageReceivedHeartbeatEventRepository, - IIoTDbProvider dbProvider, - IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService) - { - _logger = logger; - _tcpService = tcpService; - _serviceProvider = serviceProvider; - _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; - _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; - _meterReadingRecordsRepository = meterReadingRecordsRepository; - _dbProvider = dbProvider; - _protocolService = protocolService; - } - - [LogIntercept] - [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] - public async Task LoginIssuedEvent(List issuedEventMessages) - { - bool isAck = true; - foreach (var issuedEventMessage in issuedEventMessages) - { - //var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); - //if (loginEntity == null) - //{ - // isAck=false; - // break; - //} - - //loginEntity.AckTime = Clock.Now; - //loginEntity.IsAck = true; - //await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - } - // TODO:暂时ACK,等后续处理是否放到私信队列中 - return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); - } - - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] - public async Task HeartbeatIssuedEvent(List issuedEventMessages) - { - bool isAck = true; - //foreach (var issuedEventMessage in issuedEventMessages) - //{ - // var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); - // if (heartbeatEntity == null) - // { - // isAck = false; - // break; - // } - // heartbeatEntity.AckTime = Clock.Now; - // heartbeatEntity.IsAck = true; - // await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - // } - // TODO:暂时ACK,等后续处理是否放到私信队列中 - return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); - } - - [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] - public async Task ReceivedEvent(MessageProtocolAnalysis receivedMessage) - { - var currentTime = Clock.Now; - - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - //todo 会根据不同的协议进行解析,然后做业务处理 - //TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); - //if (tB3761 == null) - //{ - // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - // return SubscribeAck.Success(); - //} - //if (tB3761.DT == null || tB3761.AFN_FC == null) - //{ - // Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - // return SubscribeAck.Success(); - //} - - //报文入库 - var entity = new MeterReadingRecords() - { - ReceivedMessageHexString = receivedMessage.MessageHexString, - AFN = (AFN)receivedMessage.Data?.AFN_FC.AFN!, - Fn = receivedMessage.Data.DT.Fn, - Pn = 0, - FocusAddress = "", - MeterAddress = "", - }; - - //如果没数据,则插入,有数据则更新 - var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime); - if (updateEntity == null) - { - await _meterReadingRecordsRepository.InsertAsync(entity, currentTime); - } - - - //_dbProvider.InsertAsync(); - //todo 查找是否有下发任务 - - //await _messageReceivedEventRepository.InsertAsync(receivedMessage); - - - } - return SubscribeAck.Success(); - } - - [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)] - public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) - { - await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); - return SubscribeAck.Success(); - } - - [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)] - public async Task ReceivedLoginEvent(List receivedLoginMessages) - { - await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages); - return SubscribeAck.Success(); - } - - } -} diff --git a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs deleted file mode 100644 index d1de00a..0000000 --- a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ /dev/null @@ -1,43 +0,0 @@ -//using System; -//using System.Threading; -//using System.Threading.Tasks; -//using Hangfire; -//using JiShe.CollectBus.Common.Consts; -//using JiShe.CollectBus.ScheduledMeterReading; -//using Microsoft.Extensions.Logging; -//using Volo.Abp.BackgroundWorkers.Hangfire; -//using Volo.Abp.DependencyInjection; -//using Volo.Abp.Uow; - -//namespace JiShe.CollectBus.Workers -//{ -// /// -// /// 构建待处理的下发指令任务处理 -// /// -// public class CreateToBeIssueTaskWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker -// { -// private readonly ILogger _logger; -// private readonly IScheduledMeterReadingService _scheduledMeterReadingService; - -// /// -// /// Initializes a new instance of the class. -// /// -// /// The logger. -// /// 定时任务 -// public CreateToBeIssueTaskWorker(ILogger logger, IScheduledMeterReadingService scheduledMeterReadingService) -// { -// _logger = logger; -// RecurringJobId = nameof(CreateToBeIssueTaskWorker); -// CronExpression = "0 0/1 * * * *"; -// TimeZone = TimeZoneInfo.Local; -// this._scheduledMeterReadingService = scheduledMeterReadingService; -// } - - -// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) -// { -// _logger.LogError($"{DateTime.Now}"); -// // await _scheduledMeterReadingService.CreateToBeIssueTasks(); -// } -// } -//} diff --git a/services/JiShe.CollectBus.Application/Workers/DataDetectionFifteenMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/DataDetectionFifteenMinuteWorker.cs deleted file mode 100644 index 392c801..0000000 --- a/services/JiShe.CollectBus.Application/Workers/DataDetectionFifteenMinuteWorker.cs +++ /dev/null @@ -1,39 +0,0 @@ -//using JiShe.CollectBus.ScheduledMeterReading; -//using Microsoft.Extensions.Logging; -//using System; -//using System.Collections.Generic; -//using System.Linq; -//using System.Text; -//using System.Threading; -//using System.Threading.Tasks; -//using Volo.Abp.BackgroundWorkers.Hangfire; -//using Volo.Abp.DependencyInjection; - -//namespace JiShe.CollectBus.Workers -//{ -// /// -// /// 定时数据检测1小时一次 -// /// -// public class DataDetectionFifteenMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker -// { - -// private readonly ILogger _logger; -// private readonly IScheduledMeterReadingService _scheduledMeterReadingService; - -// public DataDetectionFifteenMinuteWorker(ILogger logger, IScheduledMeterReadingService scheduledMeterReadingService) -// { -// _logger = logger; -// RecurringJobId = nameof(CreateToBeIssueTaskWorker); -// CronExpression = "0 0 0/1 * * ?"; -// TimeZone = TimeZoneInfo.Local; -// this._scheduledMeterReadingService = scheduledMeterReadingService; -// } - - -// public override Task DoWorkAsync(CancellationToken cancellationToken = default) -// { -// //throw new NotImplementedException(); -// return Task.CompletedTask; -// } -// } -//} diff --git a/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs b/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs deleted file mode 100644 index 037e2e2..0000000 --- a/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs +++ /dev/null @@ -1,39 +0,0 @@ -//using System; -//using System.Threading; -//using System.Threading.Tasks; -//using Hangfire; -//using JiShe.CollectBus.Common.Attributes; -//using Microsoft.Extensions.Logging; -//using Volo.Abp.BackgroundWorkers.Hangfire; -//using Volo.Abp.DependencyInjection; -//using Volo.Abp.Uow; - -//namespace JiShe.CollectBus.Workers -//{ -// public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker -// { -// private readonly ILogger _logger; - -// /// -// /// Initializes a new instance of the class. -// /// -// /// The logger. -// public EpiCollectWorker(ILogger logger) -// { -// _logger = logger; -// RecurringJobId = nameof(EpiCollectWorker); -// CronExpression = Cron.Daily(); - -// } - - -// public override Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) -// { -// using (var uow = LazyServiceProvider.LazyGetRequiredService().Begin()) -// { -// Logger.LogInformation("Executed MyLogWorker..!"); -// return Task.CompletedTask; -// } -// } -// } -//} diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs deleted file mode 100644 index a7ca7c9..0000000 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs +++ /dev/null @@ -1,48 +0,0 @@ -//using System; -//using System.Threading; -//using System.Threading.Tasks; -//using Hangfire; -//using JiShe.CollectBus.ScheduledMeterReading; -//using Microsoft.Extensions.Logging; -//using Volo.Abp.BackgroundWorkers.Hangfire; -//using Volo.Abp.DependencyInjection; -//using Volo.Abp.Uow; - -//namespace JiShe.CollectBus.Workers -//{ -// /// -// /// 15分钟采集数据 -// /// -// public class SubscriberFifteenMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker -// { -// private readonly ILogger _logger; -// private readonly IScheduledMeterReadingService _scheduledMeterReadingService; - -// /// -// /// Initializes a new instance of the class. -// /// -// /// The logger. -// /// 定时任务 -// public SubscriberFifteenMinuteWorker(ILogger logger, IScheduledMeterReadingService scheduledMeterReadingService) -// { -// _logger = logger; -// RecurringJobId = nameof(SubscriberFifteenMinuteWorker); -// CronExpression = "0 0/15 * * * *"; -// TimeZone = TimeZoneInfo.Local; -// this._scheduledMeterReadingService = scheduledMeterReadingService; -// } - - -// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) -// { -// //await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading(); -// //await _scheduledMeterReadingService.WatermeterScheduledMeterFifteenMinuteReading(); - -// //using (var uow = LazyServiceProvider.LazyGetRequiredService().Begin()) -// //{ -// // Logger.LogInformation("Executed MyLogWorker..!"); -// // return Task.CompletedTask; -// //} -// } -// } -//} diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs deleted file mode 100644 index 119421e..0000000 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs +++ /dev/null @@ -1,42 +0,0 @@ -//using System; -//using System.Threading; -//using System.Threading.Tasks; -//using Hangfire; -//using JiShe.CollectBus.ScheduledMeterReading; -//using Microsoft.Extensions.Logging; -//using Volo.Abp.BackgroundWorkers.Hangfire; -//using Volo.Abp.DependencyInjection; -//using Volo.Abp.Uow; - -//namespace JiShe.CollectBus.Workers -//{ -// /// -// /// 5分钟采集数据 -// /// -// public class SubscriberFiveMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker -// { -// private readonly ILogger _logger; -// private readonly IScheduledMeterReadingService _scheduledMeterReadingService; - -// /// -// /// Initializes a new instance of the class. -// /// -// /// The logger. -// /// 定时任务 -// public SubscriberFiveMinuteWorker(ILogger logger, IScheduledMeterReadingService scheduledMeterReadingService) -// { -// _logger = logger; -// RecurringJobId = nameof(SubscriberFiveMinuteWorker); -// CronExpression = "0 0/5 * * * *"; -// TimeZone = TimeZoneInfo.Local; -// this._scheduledMeterReadingService = scheduledMeterReadingService; -// } - - -// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) -// { -// //await _scheduledMeterReadingService.AmmeterScheduledMeterFiveMinuteReading(); -// //await _scheduledMeterReadingService.WatermeterScheduledMeterFiveMinuteReading(); -// } -// } -//} diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs deleted file mode 100644 index 419a681..0000000 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs +++ /dev/null @@ -1,44 +0,0 @@ -//using System; -//using System.Threading; -//using System.Threading.Tasks; -//using Hangfire; -//using JiShe.CollectBus.ScheduledMeterReading; -//using Microsoft.Extensions.Logging; -//using Volo.Abp.BackgroundWorkers.Hangfire; -//using Volo.Abp.DependencyInjection; -//using Volo.Abp.Uow; - -//namespace JiShe.CollectBus.Workers -//{ -// /// -// /// 1分钟采集数据 -// /// -// public class SubscriberOneMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker -// { -// private readonly ILogger _logger; -// private readonly IScheduledMeterReadingService _scheduledMeterReadingService; - -// /// -// /// Initializes a new instance of the class. -// /// -// /// The logger. -// /// 定时任务 -// public SubscriberOneMinuteWorker(ILogger logger, IScheduledMeterReadingService scheduledMeterReadingService) -// { -// _logger = logger; -// RecurringJobId = nameof(SubscriberOneMinuteWorker); -// CronExpression = "0 0/1 * * * *"; -// TimeZone = TimeZoneInfo.Local; -// this._scheduledMeterReadingService = scheduledMeterReadingService; -// } - - -// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) -// { -// //await _scheduledMeterReadingService.AmmeterScheduledMeterOneMinuteReading(); - -// //await _scheduledMeterReadingService.WatermeterScheduledMeterOneMinuteReading(); - -// } -// } -//} diff --git a/services/JiShe.CollectBus.DbMigrator/CollectBusDbMigratorModule.cs b/services/JiShe.CollectBus.DbMigrator/CollectBusDbMigratorModule.cs index 3767b52..f3f60e2 100644 --- a/services/JiShe.CollectBus.DbMigrator/CollectBusDbMigratorModule.cs +++ b/services/JiShe.CollectBus.DbMigrator/CollectBusDbMigratorModule.cs @@ -1,4 +1,4 @@ -using JiShe.CollectBus.MongoDB; +using JiShe.CollectBus.IoTDB; using Volo.Abp.Autofac; using Volo.Abp.Modularity; @@ -6,7 +6,7 @@ namespace JiShe.CollectBus.DbMigrator; [DependsOn( typeof(AbpAutofacModule), - typeof(CollectBusMongoDbModule), + typeof(CollectBusIoTDbModule), typeof(CollectBusApplicationContractsModule) )] public class CollectBusDbMigratorModule : AbpModule diff --git a/services/JiShe.CollectBus.DbMigrator/DbMigratorHostedService.cs b/services/JiShe.CollectBus.DbMigrator/DbMigratorHostedService.cs index 46f1edb..f8c8406 100644 --- a/services/JiShe.CollectBus.DbMigrator/DbMigratorHostedService.cs +++ b/services/JiShe.CollectBus.DbMigrator/DbMigratorHostedService.cs @@ -7,6 +7,7 @@ using JiShe.CollectBus.Data; using Serilog; using Volo.Abp; using Volo.Abp.Data; +using JiShe.CollectBus.IoTDB.Interface; namespace JiShe.CollectBus.DbMigrator; @@ -33,10 +34,16 @@ public class DbMigratorHostedService : IHostedService { await application.InitializeAsync(); + //await application + // .ServiceProvider + // .GetRequiredService() + // .MigrateAsync(); + + //初始化IoTDB表模型 await application .ServiceProvider - .GetRequiredService() - .MigrateAsync(); + .GetRequiredService().GetSessionPool(true) + .InitTableSessionModelAsync(); await application.ShutdownAsync(); diff --git a/services/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj b/services/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj index f875a56..00b7197 100644 --- a/services/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj +++ b/services/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj @@ -18,7 +18,6 @@ - diff --git a/services/JiShe.CollectBus.DbMigrator/appsettings.json b/services/JiShe.CollectBus.DbMigrator/appsettings.json index 90ea5ae..987ba94 100644 --- a/services/JiShe.CollectBus.DbMigrator/appsettings.json +++ b/services/JiShe.CollectBus.DbMigrator/appsettings.json @@ -1,5 +1,14 @@ { "ConnectionStrings": { "Default": "mongodb://admin:collectbus_mongodb_jishe@118.190.144.92:37017/JiSheCollectBus?authSource=admin" + }, + "IoTDBOptions": { + "UserName": "root", + "Password": "root", + "ClusterList": [ "192.168.5.9:6667" ], + "PoolSize": 32, + "DataBaseName": "energy", + "OpenDebugMode": true, + "UseTableSessionPoolByDefault": false } } diff --git a/shared/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj b/shared/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj index 6cf0246..4b94598 100644 --- a/shared/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj +++ b/shared/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj @@ -17,13 +17,17 @@ - + + + + + diff --git a/shared/JiShe.CollectBus.Common/WorkService/SystemBackGroundWorkService.cs b/shared/JiShe.CollectBus.Common/WorkService/SystemBackGroundWorkService.cs new file mode 100644 index 0000000..4b2ada0 --- /dev/null +++ b/shared/JiShe.CollectBus.Common/WorkService/SystemBackGroundWorkService.cs @@ -0,0 +1,82 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.WorkService +{ + /// + /// 系统后台定时服务 + /// + public abstract class SystemBackGroundWorkService : BackgroundService + { + /// + /// 日志记录 + /// + public ILogger Logger { get; set; } + + /// + /// 创建一个取消标记源 + /// + private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + + /// + /// 任务执行间隔时间 + /// + private TimeSpan interval; + + protected SystemBackGroundWorkService(ILogger logger) + { + Logger = logger; + interval = GetInterval(); + } + + /// + /// 执行时间间隔 + /// + /// + protected abstract TimeSpan GetInterval(); + + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await Task.CompletedTask;// 等待其他任务执行完成,避免阻塞应用程序启动 + Logger.LogInformation($"任务每隔{interval.TotalSeconds}秒执行一次"); + await InitAsync(cancellationTokenSource.Token); + } + + protected virtual async Task InitAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + await DoWorkAsync(cancellationToken); + } + catch (Exception ex) + { + Logger.LogError(ex, "后台任务执行发生异常"); + } + + await Task.Delay(interval, cancellationToken); + } + } + + public override Task StopAsync(CancellationToken cancellationToken) + { + Logger.LogInformation("后台服务停止……"); + cancellationTokenSource.Cancel(); + return base.StopAsync(cancellationToken); + } + + /// + /// 抛出方法入口以便于其服务实现 + /// + /// + /// + protected abstract Task DoWorkAsync(CancellationToken cancellationToken); + } +} diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index b00b7b6..bae661a 100644 --- a/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -19,7 +19,6 @@ using TouchSocket.Sockets; using JiShe.CollectBus.Plugins; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Diagnostics.HealthChecks; -using JiShe.CollectBus.Cassandra; namespace JiShe.CollectBus.Host @@ -39,18 +38,18 @@ namespace JiShe.CollectBus.Host Configure(options => { options.IsJobExecutionEnabled = false; }); - context.Services.AddHangfire(config => - { - config.UseRedisStorage( - context.Services.GetConfiguration().GetValue("Redis:Configuration"), redisStorageOptions) - .WithJobExpirationTimeout(TimeSpan.FromDays(7)); - var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔 - const int Attempts = 3; // 重试次数 - config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds }); - //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7))); - config.UseFilter(new JobRetryLastFilter(Attempts)); - }); - context.Services.AddHangfireServer(); + //context.Services.AddHangfire(config => + //{ + // config.UseRedisStorage( + // context.Services.GetConfiguration().GetValue("Redis:Configuration"), redisStorageOptions) + // .WithJobExpirationTimeout(TimeSpan.FromDays(7)); + // var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔 + // const int Attempts = 3; // 重试次数 + // config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds }); + // //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7))); + // config.UseFilter(new JobRetryLastFilter(Attempts)); + //}); + //context.Services.AddHangfireServer(); } /// @@ -194,9 +193,9 @@ namespace JiShe.CollectBus.Host options => { options.IgnoredUrls.Add("/AuditLogs/page"); - options.IgnoredUrls.Add("/hangfire/stats"); + //options.IgnoredUrls.Add("/hangfire/stats"); options.IgnoredUrls.Add("/hangfire/recurring/trigger"); - options.IgnoredUrls.Add("/cap"); + //options.IgnoredUrls.Add("/cap"); options.IgnoredUrls.Add("/"); }); } @@ -252,11 +251,9 @@ namespace JiShe.CollectBus.Host /// private void ConfigureHealthChecks(ServiceConfigurationContext context, IConfiguration configuration) { - if (!configuration.GetValue("HealthChecks:IsEnable")) return; - var cassandraConfig = new CassandraConfig(); - configuration.GetSection("Cassandra").Bind(cassandraConfig); + if (!configuration.GetValue("HealthChecks:IsEnable")) return; context.Services.AddHealthChecks() - .AddMongoDb(configuration.GetConnectionString("Default"), "MongoDB", HealthStatus.Unhealthy) + //.AddMongoDb(configuration.GetConnectionString("Default"), "MongoDB", HealthStatus.Unhealthy) .AddRedis(configuration.GetValue("Redis:Configuration") ?? string.Empty, "Redis", HealthStatus.Unhealthy) //.AddKafka(new Confluent.Kafka.ProducerConfig diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs index 1666341..905d6f4 100644 --- a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -5,14 +5,12 @@ using JiShe.CollectBus.Host.Extensions; using JiShe.CollectBus.Host.HealthChecks; using JiShe.CollectBus.Host.Swaggers; using JiShe.CollectBus.IoTDB.Options; -using JiShe.CollectBus.MongoDB; using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Swashbuckle.AspNetCore.SwaggerUI; using Volo.Abp; using Volo.Abp.AspNetCore.Authentication.JwtBearer; using Volo.Abp.AspNetCore.Serilog; using Volo.Abp.Autofac; -using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.Caching.StackExchangeRedis; using Volo.Abp.Modularity; using Volo.Abp.Swashbuckle; @@ -30,9 +28,7 @@ namespace JiShe.CollectBus.Host typeof(AbpSwashbuckleModule), typeof(AbpTimingModule), typeof(CollectBusApplicationModule), - typeof(CollectBusMongoDbModule), - typeof(AbpCachingStackExchangeRedisModule), - typeof(AbpBackgroundWorkersHangfireModule) + typeof(AbpCachingStackExchangeRedisModule) )] public partial class CollectBusHostModule : AbpModule { @@ -47,8 +43,8 @@ namespace JiShe.CollectBus.Host ConfigureSwaggerServices(context, configuration); ConfigureNetwork(context, configuration); ConfigureJwtAuthentication(context, configuration); - ConfigureHangfire(context); - ConfigureAuditLog(context); + //ConfigureHangfire(context); + //ConfigureAuditLog(context); ConfigureCustom(context, configuration); ConfigureHealthChecks(context, configuration); Configure(options => { options.Kind = DateTimeKind.Local; }); @@ -92,10 +88,10 @@ namespace JiShe.CollectBus.Host app.UseAuditing(); app.UseAbpSerilogEnrichers(); app.UseUnitOfWork(); - app.UseHangfireDashboard("/hangfire", new DashboardOptions - { - IgnoreAntiforgeryToken = true - }); + //app.UseHangfireDashboard("/hangfire", new DashboardOptions + //{ + // IgnoreAntiforgeryToken = true + //}); app.UseConfiguredEndpoints(endpoints => { if (!configuration.GetValue("HealthChecks:IsEnable")) return; diff --git a/web/JiShe.CollectBus.Host/HealthChecks/CassandraHealthCheck.cs b/web/JiShe.CollectBus.Host/HealthChecks/CassandraHealthCheck.cs index 1157239..edc5c37 100644 --- a/web/JiShe.CollectBus.Host/HealthChecks/CassandraHealthCheck.cs +++ b/web/JiShe.CollectBus.Host/HealthChecks/CassandraHealthCheck.cs @@ -1,5 +1,5 @@ -using Cassandra; -using JiShe.CollectBus.Cassandra; +//using Cassandra; +//using JiShe.CollectBus.Cassandra; using Microsoft.Extensions.Diagnostics.HealthChecks; namespace JiShe.CollectBus.Host.HealthChecks @@ -31,27 +31,28 @@ namespace JiShe.CollectBus.Host.HealthChecks /// public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) { - var cassandraConfig = new CassandraConfig(); - _configuration.GetSection("Cassandra").Bind(cassandraConfig); - try - { - var clusterBuilder = Cluster.Builder(); - foreach (var node in cassandraConfig.Nodes) - { - clusterBuilder.AddContactPoint(node.Host) - .WithPort(node.Port); - } - clusterBuilder.WithCredentials(cassandraConfig.Username, cassandraConfig.Password); - var cluster = clusterBuilder.Build(); - using var session = await cluster.ConnectAsync(); - var result = await Task.FromResult(session.Execute("SELECT release_version FROM system.local")); - var version = result.First().GetValue("release_version"); - return HealthCheckResult.Healthy($"Cassandra is healthy. Version: {version}"); - } - catch (Exception ex) - { - return new HealthCheckResult(context.Registration.FailureStatus, $"Cassandra is unhealthy: {ex.Message}", ex); - } + return HealthCheckResult.Healthy("Cassandra is unhealthy."); + //var cassandraConfig = new CassandraConfig(); + //_configuration.GetSection("Cassandra").Bind(cassandraConfig); + //try + //{ + // var clusterBuilder = Cluster.Builder(); + // foreach (var node in cassandraConfig.Nodes) + // { + // clusterBuilder.AddContactPoint(node.Host) + // .WithPort(node.Port); + // } + // clusterBuilder.WithCredentials(cassandraConfig.Username, cassandraConfig.Password); + // var cluster = clusterBuilder.Build(); + // using var session = await cluster.ConnectAsync(); + // var result = await Task.FromResult(session.Execute("SELECT release_version FROM system.local")); + // var version = result.First().GetValue("release_version"); + // return HealthCheckResult.Healthy($"Cassandra is healthy. Version: {version}"); + //} + //catch (Exception ex) + //{ + // return new HealthCheckResult(context.Registration.FailureStatus, $"Cassandra is unhealthy: {ex.Message}", ex); + //} } } } \ No newline at end of file diff --git a/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs b/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs index a1cebf2..960445b 100644 --- a/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs +++ b/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs @@ -1,5 +1,4 @@ using System.Net.Sockets; -using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Provider; diff --git a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj index 96786fe..2877bab 100644 --- a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj +++ b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj @@ -25,7 +25,6 @@ - @@ -47,21 +46,16 @@ - - - + - - + diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 6bfcc45..3036d09 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -140,7 +140,7 @@ } ], "ServerApplicationOptions": { - "ServerTagName": "JiSheCollectBus99", + "ServerTagName": "JiSheCollectBus77", "SystemType": "Energy", "FirstCollectionTime": "2025-04-28 15:07:00", "AutomaticVerificationTime": "16:07:00", @@ -153,5 +153,8 @@ "PlugInFolder": "", "TCP": { "ClientPort": 10500 + }, + "BackgroundJobs": { + "IsJobExecutionEnabled": false // 关闭任务执行 } } \ No newline at end of file diff --git a/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs b/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs index b9a5f05..8fb15df 100644 --- a/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs +++ b/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs @@ -2,7 +2,6 @@ using JiShe.CollectBus.Common; using JiShe.CollectBus.Migration.Host.HealthChecks; using JiShe.CollectBus.Migration.Host.Swaggers; -using JiShe.CollectBus.MongoDB; using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Swashbuckle.AspNetCore.SwaggerUI; using Volo.Abp; @@ -27,7 +26,6 @@ namespace JiShe.CollectBus.Migration.Host typeof(AbpAspNetCoreSerilogModule), typeof(AbpSwashbuckleModule), typeof(AbpTimingModule), - typeof(CollectBusMongoDbModule), typeof(CollectBusMigrationApplicationModule), typeof(AbpCachingStackExchangeRedisModule) )] diff --git a/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj b/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj index 5a66ca8..5e6ba2d 100644 --- a/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj +++ b/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj @@ -21,7 +21,6 @@ - @@ -43,19 +42,14 @@ - - -