From 8f3dea6c2ce988e32b68040831248cf109d318ee Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Mon, 26 May 2025 11:20:32 +0800
Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=A1=B9=E7=9B=AE=E7=BB=93?=
=?UTF-8?q?=E6=9E=84=E4=BE=9D=E8=B5=96=EF=BC=8C=E7=A7=BB=E9=99=A4Mongo?=
=?UTF-8?q?=E3=80=81Hangfire=E3=80=81Cassandra?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../JiShe.CollectBus.Protocol.T1882018.csproj | 2 +
.../T37612012ProtocolPlugin.cs | 3 -
.../Abstracts/ProtocolPlugin.cs | 6 +-
.../Subscribers/ISubscriberAppService.cs | 1 -
.../CollectBusApplicationModule.cs | 51 +---
.../DataChannels/DataChannelManageService.cs | 240 +++++++++---------
.../JiShe.CollectBus.Application.csproj | 14 +-
.../Mappers/CollectBusMapping.cs | 17 --
.../Samples/TestAppService.cs | 143 ++++++-----
.../BasicScheduledMeterReadingService.cs | 4 +-
.../SubscriberAnalysisAppService.cs | 5 +-
.../Subscribers/SubscriberAppService.cs | 172 -------------
.../Workers/CreateToBeIssueTaskWorker.cs | 43 ----
.../DataDetectionFifteenMinuteWorker.cs | 39 ---
.../Workers/EpiCollectWorker.cs | 39 ---
.../Workers/SubscriberFifteenMinuteWorker.cs | 48 ----
.../Workers/SubscriberFiveMinuteWorker.cs | 42 ---
.../Workers/SubscriberOneMinuteWorker.cs | 44 ----
.../CollectBusDbMigratorModule.cs | 4 +-
.../DbMigratorHostedService.cs | 11 +-
.../JiShe.CollectBus.DbMigrator.csproj | 1 -
.../appsettings.json | 9 +
.../JiShe.CollectBus.Common.csproj | 6 +-
.../SystemBackGroundWorkService.cs | 82 ++++++
.../CollectBusHostModule.Configure.cs | 35 ++-
.../CollectBusHostModule.cs | 18 +-
.../HealthChecks/CassandraHealthCheck.cs | 47 ++--
.../HealthChecks/IoTDBHealthCheck.cs | 1 -
.../JiShe.CollectBus.Host.csproj | 10 +-
web/JiShe.CollectBus.Host/appsettings.json | 5 +-
.../CollectBusMigrationHostModule.cs | 2 -
.../JiShe.CollectBus.Migration.Host.csproj | 6 -
32 files changed, 373 insertions(+), 777 deletions(-)
delete mode 100644 services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs
delete mode 100644 services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
delete mode 100644 services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs
delete mode 100644 services/JiShe.CollectBus.Application/Workers/DataDetectionFifteenMinuteWorker.cs
delete mode 100644 services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs
delete mode 100644 services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs
delete mode 100644 services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs
delete mode 100644 services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs
create mode 100644 shared/JiShe.CollectBus.Common/WorkService/SystemBackGroundWorkService.cs
diff --git a/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj b/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj
index 4866985..0f8d650 100644
--- a/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj
+++ b/protocols/JiShe.CollectBus.Protocol.T1882018/JiShe.CollectBus.Protocol.T1882018.csproj
@@ -9,6 +9,8 @@
+
+
diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs
index ed82fa0..0c74082 100644
--- a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs
@@ -27,7 +27,6 @@ namespace JiShe.CollectBus.Protocol.T37612012
private readonly IProducerService _producerService;
- private readonly IRepository _deviceRepository;
private readonly ITcpService _tcpService;
public readonly Dictionary T3761AFNHandlers;
@@ -41,7 +40,6 @@ namespace JiShe.CollectBus.Protocol.T37612012
_logger = logger;
//_logger = serviceProvider.GetRequiredService>();
_producerService = serviceProvider.GetRequiredService();
- _deviceRepository = serviceProvider.GetRequiredService>();
_tcpService = tcpService;
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
}
@@ -133,7 +131,6 @@ namespace JiShe.CollectBus.Protocol.T37612012
else
{
_logger.LogError($"不支持的上报kafka主题:{topicName}");
- await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs
index c6779e5..f6a689e 100644
--- a/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol/Abstracts/ProtocolPlugin.cs
@@ -20,13 +20,11 @@ namespace JiShe.CollectBus.Protocol.Abstracts
public const string errorData = "EE";
private readonly ILogger _logger;
- private readonly IRepository _protocolInfoRepository;
private readonly IFreeRedisProvider _redisProvider;
public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger)
{
_logger = logger;
- _protocolInfoRepository = serviceProvider.GetRequiredService>();
_redisProvider = serviceProvider.GetRequiredService();
}
@@ -40,10 +38,8 @@ namespace JiShe.CollectBus.Protocol.Abstracts
if (Info == null)
{
throw new ArgumentNullException(nameof(Info));
- }
+ }
- await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
- await _protocolInfoRepository.InsertAsync(Info);
await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
}
diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs
index 63594c8..8ce2662 100644
--- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs
+++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs
@@ -12,7 +12,6 @@ namespace JiShe.CollectBus.Subscribers
{
Task LoginIssuedEvent(List issuedEventMessage);
Task HeartbeatIssuedEvent(List issuedEventMessage);
- Task ReceivedEvent(MessageProtocolAnalysis receivedMessage);
Task ReceivedHeartbeatEvent(List receivedHeartbeatMessage);
Task ReceivedLoginEvent(List receivedLoginMessage);
}
diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
index 942befc..8fa7033 100644
--- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
+++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
@@ -1,13 +1,10 @@
-using Cassandra.Mapping;
-using JiShe.CollectBus.Cassandra;
-using JiShe.CollectBus.DataChannels;
+using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IoTDB;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
-using JiShe.CollectBus.Mappers;
using JiShe.CollectBus.Protocol;
using JiShe.CollectBus.ScheduledMeterReading;
using Microsoft.Extensions.DependencyInjection;
@@ -19,12 +16,8 @@ using System.Threading.Channels;
using System.Threading.Tasks;
using Volo.Abp;
using Volo.Abp.Application;
-using Volo.Abp.AuditLogging;
using Volo.Abp.Autofac;
using Volo.Abp.AutoMapper;
-using Volo.Abp.BackgroundJobs;
-using Volo.Abp.BackgroundWorkers;
-using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus;
@@ -34,16 +27,12 @@ namespace JiShe.CollectBus;
typeof(CollectBusApplicationContractsModule),
typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule),
- typeof(AbpAutofacModule),
- typeof(AbpBackgroundWorkersHangfireModule),
typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule),
typeof(CollectBusKafkaModule),
typeof(CollectBusIoTDbModule),
+ typeof(AbpAutofacModule),
typeof(CollectBusDomainSharedModule),
- typeof(AbpAuditLoggingDomainModule),
- typeof(AbpBackgroundJobsDomainModule),
- typeof(CollectBusCassandraModule),
typeof(CollectBusProtocolModule)
)]
public class CollectBusApplicationModule : AbpModule
@@ -55,8 +44,8 @@ public class CollectBusApplicationModule : AbpModule
context.Services.AddAutoMapperObjectMapper();
Configure(options => { options.AddMaps(true); });
- context.Services.AddSingleton(new MappingConfiguration()
- .Define(new CollectBusMapping()));
+ //context.Services.AddSingleton(new MappingConfiguration()
+ // .Define(new CollectBusMapping()));
// 注册拦截器
context.Services.OnRegistered(ctx =>
@@ -70,33 +59,17 @@ public class CollectBusApplicationModule : AbpModule
public override async Task OnApplicationInitializationAsync(
ApplicationInitializationContext context)
{
- var assembly = Assembly.GetExecutingAssembly();
- var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface)
- .ToList();
- foreach (var type in types) await context.AddBackgroundWorkerAsync(type);
-
- //Task.Run(() =>
+ //var assembly = Assembly.GetExecutingAssembly();
+ //var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface)
+ // .ToList();
+ //foreach (var type in types)
//{
- // //默认初始化表计信息
- // var dbContext = context.ServiceProvider.GetRequiredService();
- // dbContext.InitAmmeterCacheData();
- // //await dbContext.InitWatermeterCacheData();
- //}).ConfigureAwait(false);
-
+ // await context.AddBackgroundWorkerAsync(type);
+ //}
+
//下发任务通道构建
DataChannelManage.TaskDataChannel = Channel.CreateUnbounded>>();
-
-
- // 日志存储通道构建
- DataChannelManage.LogSaveChannel = Channel.CreateUnbounded