Compare commits

..

No commits in common. "946dffbaa740b985f25ff5d7cea793d8a33bce41" and "a2cc947d7d826a8771580dc4ccf6d0778483c298" have entirely different histories.

6 changed files with 93 additions and 61 deletions

View File

@ -1,6 +0,0 @@
namespace JiShe.CollectBus.Protocol.Contracts.ProtocolPools
{
public interface IPluginContainer
{
}
}

View File

@ -1,26 +0,0 @@
using JiShe.CollectBus.Interfaces;
namespace JiShe.CollectBus.Protocol.Contracts.ProtocolPools
{
public class PluginContainer: IPluginContainer
{
public Dictionary<string, object> ProtocolPools;
public PluginContainer()
{
}
}
public static class ServiceProviderKeyedServiceExtensions
{
//public static Task AddKeyedSingleton<TImp>(this IServiceProvider provider, string key)
//{
// //var aa = Activator.CreateInstance<TImp>();
//}
}
}

View File

@ -17,6 +17,7 @@ namespace JiShe.CollectBus.Protocol.Test
{ {
Console.WriteLine("TestProtocolPlugin OnApplicationInitializationAsync"); Console.WriteLine("TestProtocolPlugin OnApplicationInitializationAsync");
var protocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(TestProtocolPlugin)); var protocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(TestProtocolPlugin));
RemoveServiceAtRuntime(context.ServiceProvider);
await protocol.LoadAsync(); await protocol.LoadAsync();
} }
@ -25,5 +26,19 @@ namespace JiShe.CollectBus.Protocol.Test
Console.WriteLine("TestProtocolPlugin OnApplicationShutdown"); Console.WriteLine("TestProtocolPlugin OnApplicationShutdown");
base.OnApplicationShutdown(context); base.OnApplicationShutdown(context);
} }
public void RemoveServiceAtRuntime(IServiceProvider serviceProvider)
{
var services = serviceProvider.GetService<IServiceCollection>();
services?.AddKeyedSingleton<IProtocolPlugin, TestProtocolPlugin>(nameof(TestProtocolPlugin));
//var services = (IServiceCollection)serviceProvider.GetService(typeof(IServiceCollection));
//var pluginService = serviceProvider.GetKeyedService<IProtocolPlugin>(nameof(TestProtocolPlugin));
//if (services != null && pluginService!=null)
//{
// services.Remove(pluginService);
//}
}
} }
} }

View File

@ -1,24 +1,34 @@
using System.Linq; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.ScheduledMeterReading;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Generic;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cassandra.Mapping; using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IoTDB; using JiShe.CollectBus.IoTDB;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Mappers; using JiShe.CollectBus.Mappers;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.ScheduledMeterReading;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Application; using Volo.Abp.Application;
using Volo.Abp.Autofac; using Volo.Abp.Autofac;
using Volo.Abp.AutoMapper; using Volo.Abp.AutoMapper;
using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers;
using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.EventBus;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Microsoft.Extensions.Options;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.Common.Attributes;
using ZstdSharp.Unsafe;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -43,7 +53,10 @@ public class CollectBusApplicationModule : AbpModule
var configuration = context.Services.GetConfiguration(); var configuration = context.Services.GetConfiguration();
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>(); context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
Configure<AbpAutoMapperOptions>(options => { options.AddMaps<CollectBusApplicationModule>(true); }); Configure<AbpAutoMapperOptions>(options =>
{
options.AddMaps<CollectBusApplicationModule>(validate: true);
});
context.Services.AddSingleton(new MappingConfiguration() context.Services.AddSingleton(new MappingConfiguration()
.Define(new CollectBusMapping())); .Define(new CollectBusMapping()));
@ -53,7 +66,10 @@ public class CollectBusApplicationModule : AbpModule
{ {
var methods = ctx.ImplementationType.GetMethods(); var methods = ctx.ImplementationType.GetMethods();
var any = methods.Any(a=>a.GetCustomAttribute<LogInterceptAttribute>()!=null); var any = methods.Any(a=>a.GetCustomAttribute<LogInterceptAttribute>()!=null);
if (any) ctx.Interceptors.TryAdd<LogInterceptor>(); if (any)
{
ctx.Interceptors.TryAdd<LogInterceptor>();
}
}); });
} }
@ -61,9 +77,11 @@ public class CollectBusApplicationModule : AbpModule
ApplicationInitializationContext context) ApplicationInitializationContext context)
{ {
var assembly = Assembly.GetExecutingAssembly(); var assembly = Assembly.GetExecutingAssembly();
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface) var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
.ToList(); foreach (var type in types)
foreach (var type in types) await context.AddBackgroundWorkerAsync(type); {
await context.AddBackgroundWorkerAsync(type);
}
Task.Run(() => Task.Run(() =>
{ {
@ -72,5 +90,7 @@ public class CollectBusApplicationModule : AbpModule
dbContext.InitAmmeterCacheData(); dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData(); //await dbContext.InitWatermeterCacheData();
}).ConfigureAwait(false); }).ConfigureAwait(false);
} }
} }

View File

@ -31,6 +31,30 @@ namespace JiShe.CollectBus.DynamicModule
return _moduleContainer.Modules.Select(m => m.Type).ToArray(); return _moduleContainer.Modules.Select(m => m.Type).ToArray();
} }
public async Task InitializeModuleAsync(Type moduleType)
{
//if (!typeof(IAbpModule).IsAssignableFrom(moduleType))
//{
// throw new ArgumentException($"指定的类型 {moduleType.FullName} 不是有效的ABP模块类型", nameof(moduleType));
//}
//var module = (IAbpModule)Activator.CreateInstance(moduleType);
//// 配置服务
//var configureServicesMethod = moduleType.GetMethod("ConfigureServices",
// BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy);
//if (configureServicesMethod != null)
//{
// var serviceConfigurationContext = CreateServiceConfigurationContext();
// configureServicesMethod.Invoke(module, new object[] { serviceConfigurationContext });
//}
//await CallModuleMethodAsync(module, "OnApplicationInitializationAsync", new object[] { new ApplicationInitializationContext(_serviceProvider) });
}
public async Task ReinitializeModuleAsync(Type moduleType) public async Task ReinitializeModuleAsync(Type moduleType)
{ {
if (!typeof(IAbpModule).IsAssignableFrom(moduleType)) if (!typeof(IAbpModule).IsAssignableFrom(moduleType))
@ -46,14 +70,16 @@ namespace JiShe.CollectBus.DynamicModule
var module = moduleDescriptor.Instance; var module = moduleDescriptor.Instance;
await CallModuleMethodAsync(module, "OnApplicationShutdown", [new ApplicationShutdownContext(_serviceProvider)]); await CallModuleMethodAsync(module, "OnApplicationShutdown", new object[] { new ApplicationShutdownContext(_serviceProvider) });
var configureServicesMethod = moduleType.GetMethod("ConfigureServices",BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy); //var configureServicesMethod = moduleType.GetMethod("ConfigureServices",
if (configureServicesMethod != null) // BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy);
{
var serviceConfigurationContext = CreateServiceConfigurationContext(); //if (configureServicesMethod != null)
configureServicesMethod.Invoke(module, [serviceConfigurationContext]); //{
} // var serviceConfigurationContext = CreateServiceConfigurationContext();
await CallModuleMethodAsync(module, "OnApplicationInitializationAsync", [new ApplicationInitializationContext(_serviceProvider)]); // configureServicesMethod.Invoke(module, new object[] { serviceConfigurationContext });
//}
await CallModuleMethodAsync(module, "OnApplicationInitializationAsync", new object[] { new ApplicationInitializationContext(_serviceProvider) });
} }
public async Task UnloadModuleAsync(Type moduleType) public async Task UnloadModuleAsync(Type moduleType)
@ -131,7 +157,7 @@ namespace JiShe.CollectBus.DynamicModule
services.Add(service); services.Add(service);
} }
return (ServiceConfigurationContext)constructor.Invoke([services]); return (ServiceConfigurationContext)constructor.Invoke(new object[] { services });
} }
// 如果反射失败,使用备选方案 // 如果反射失败,使用备选方案

View File

@ -79,9 +79,7 @@
"SaslUserName": "lixiao", "SaslUserName": "lixiao",
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30
"ServerTagName": "JiSheCollectBus100",
"FirstCollectionTime": "2025-04-22 16:07:00"
}, },
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
@ -141,5 +139,10 @@
"DefaultIdempotence": true "DefaultIdempotence": true
} }
}, },
"PlugInFolder": "C:\\Users\\Dai Zan\\Desktop\\Plugins" "PlugInFolder": "",
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus100",
"FirstCollectionTime": "2025-04-22 16:07:00",
"AutomaticVerificationTime": "16:07:00"
}
} }