JiShe.CollectBus/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs

405 lines
18 KiB
C#
Raw Normal View History

2024-12-19 16:07:07 +08:00
using System.Text;
using Hangfire;
using Hangfire.Redis.StackExchange;
using JiShe.CollectBus.Host.Hangfire;
using JiShe.CollectBus.Host.Swaggers;
using MassTransit;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.IdentityModel.Tokens;
using Microsoft.OpenApi.Models;
using StackExchange.Redis;
using Volo.Abp.AspNetCore.Auditing;
using Volo.Abp.Auditing;
using Volo.Abp.BackgroundJobs;
using Volo.Abp.Caching;
using Volo.Abp.Modularity;
using TouchSocket.Core;
using TouchSocket.Sockets;
using JiShe.CollectBus.Plugins;
using JiShe.CollectBus.Consumers;
using JiShe.CollectBus.Protocol.Contracts;
2025-03-14 14:28:04 +08:00
using JiShe.CollectBus.IotSystems.MessageReceiveds;
2025-03-24 20:54:31 +08:00
using JiShe.CollectBus.IotSystems.MessageIssueds;
2025-04-01 22:50:34 +08:00
using Confluent.Kafka;
2025-04-02 09:42:04 +08:00
using MassTransit.SqlTransport.Topology;
2025-04-07 17:35:37 +08:00
using Confluent.Kafka.Admin;
2024-12-19 16:07:07 +08:00
namespace JiShe.CollectBus.Host
{
public partial class CollectBusHostModule
{
/// <summary>
/// Configures the hangfire.
/// </summary>
/// <param name="context">The context.</param>
private void ConfigureHangfire(ServiceConfigurationContext context)
{
var redisStorageOptions = new RedisStorageOptions()
{
Db = context.Services.GetConfiguration().GetValue<int>("Redis:HangfireDB")
};
Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = true; });
context.Services.AddHangfire(config =>
{
config.UseRedisStorage(
context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
.WithJobExpirationTimeout(TimeSpan.FromDays(7));
var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
const int Attempts = 3; // 重试次数
config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
//config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
config.UseFilter(new JobRetryLastFilter(Attempts));
});
context.Services.AddHangfireServer();
}
/// <summary>
/// Configures the JWT authentication.
/// </summary>
/// <param name="context">The context.</param>
/// <param name="configuration">The configuration.</param>
private void ConfigureJwtAuthentication(ServiceConfigurationContext context, IConfiguration configuration)
{
context.Services.AddAuthentication(options =>
{
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
})
.AddJwtBearer(options =>
{
options.TokenValidationParameters =
new TokenValidationParameters()
{
// 是否开启签名认证
ValidateIssuerSigningKey = true,
ValidateIssuer = true,
ValidateAudience = true,
ValidateLifetime = true,
ClockSkew = TimeSpan.Zero,
ValidIssuer = configuration["Jwt:Issuer"],
ValidAudience = configuration["Jwt:Audience"],
IssuerSigningKey =
new SymmetricSecurityKey(
Encoding.ASCII.GetBytes(configuration["Jwt:SecurityKey"]))
};
options.Events = new JwtBearerEvents
{
OnMessageReceived = currentContext =>
{
var path = currentContext.HttpContext.Request.Path;
if (path.StartsWithSegments("/login"))
{
return Task.CompletedTask;
}
var accessToken = string.Empty;
if (currentContext.HttpContext.Request.Headers.ContainsKey("Authorization"))
{
accessToken = currentContext.HttpContext.Request.Headers["Authorization"];
if (!string.IsNullOrWhiteSpace(accessToken))
{
accessToken = accessToken.Split(" ").LastOrDefault();
}
}
if (string.IsNullOrWhiteSpace(accessToken))
{
accessToken = currentContext.Request.Query["access_token"].FirstOrDefault();
}
if (string.IsNullOrWhiteSpace(accessToken))
{
accessToken = currentContext.Request.Cookies[@CollectBusHostConst.DefaultCookieName];
}
currentContext.Token = accessToken;
currentContext.Request.Headers.Remove("Authorization");
currentContext.Request.Headers.Add("Authorization", $"Bearer {accessToken}");
return Task.CompletedTask;
}
};
});
}
/// <summary>
/// Configures the cache.
/// </summary>
/// <param name="context">The context.</param>
private void ConfigureCache(ServiceConfigurationContext context)
{
Configure<AbpDistributedCacheOptions>(
options => { options.KeyPrefix = "CollectBus:"; });
var configuration = context.Services.GetConfiguration();
var redis = ConnectionMultiplexer.Connect($"{configuration.GetValue<string>("Redis:Configuration")},defaultdatabase={configuration.GetValue<int>("Redis:DefaultDB")}");
context.Services
.AddDataProtection()
.PersistKeysToStackExchangeRedis(redis, "CollectBus-Protection-Keys");
}
/// <summary>
/// Configures the swagger services.
/// </summary>
/// <param name="context">The context.</param>
/// <param name="configuration">The configuration.</param>
private void ConfigureSwaggerServices(ServiceConfigurationContext context, IConfiguration configuration)
{
context.Services.AddSwaggerGen(
options =>
{
configuration.GetSection("SwaggerConfig").Get<List<SwaggerConfig>>()?.ForEach(group =>
{
options.SwaggerDoc(group.GroupName,
new OpenApiInfo { Title = group.Title, Version = group.Version });
});
options.DocInclusionPredicate((docName, apiDes) =>
{
if (docName == "Basic" && string.IsNullOrWhiteSpace(apiDes.GroupName)) return true;
return docName == apiDes.GroupName;
});
options.EnableAnnotations();
options.DocumentFilter<HiddenAbpDefaultApiFilter>();
options.SchemaFilter<EnumSchemaFilter>();
var xmlPaths = Directory.GetFiles(AppContext.BaseDirectory, "*.xml")
.Where(a => a.EndsWith("Application.xml") ||
a.EndsWith("Application.Contracts.xml") ||
a.EndsWith("httpApi.xml") ||
a.EndsWith("Host.xml"))
.Distinct()
.ToList();
foreach (var xml in xmlPaths) options.IncludeXmlComments(xml, true);
});
}
/// <summary>
/// Configures the audit log.
/// </summary>
/// <param name="context">The context.</param>
private void ConfigureAuditLog(ServiceConfigurationContext context)
{
Configure<AbpAuditingOptions>
(
options =>
{
options.IsEnabled = true;
options.EntityHistorySelectors.AddAllEntities();
options.ApplicationName = "JiShe.CollectBus";
}
);
Configure<AbpAspNetCoreAuditingOptions>(
options =>
{
options.IgnoredUrls.Add("/AuditLogs/page");
options.IgnoredUrls.Add("/hangfire/stats");
options.IgnoredUrls.Add("/hangfire/recurring/trigger");
options.IgnoredUrls.Add("/cap");
options.IgnoredUrls.Add("/");
});
}
/// <summary>
/// Configures the custom.
/// </summary>
/// <param name="context">The context.</param>
/// <param name="configuration">The configuration.</param>
private void ConfigureCustom(ServiceConfigurationContext context, IConfiguration configuration)
{
context.Services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
context.Services.AddHealthChecks();
}
/// <summary>
/// Configures the network.
/// </summary>
/// <param name="context">The context.</param>
/// <param name="configuration">The configuration.</param>
public void ConfigureNetwork(ServiceConfigurationContext context, IConfiguration configuration)
{
context.Services.AddTcpService(config =>
{
2025-03-12 17:10:00 +08:00
config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "10500"))
2024-12-19 16:07:07 +08:00
//.SetTcpDataHandlingAdapter(()=>new StandardFixedHeaderDataHandlingAdapter())
2025-03-14 14:24:38 +08:00
//.SetGetDefaultNewId(() => Guid.NewGuid().ToString())//定义ClientId的生成策略
2024-12-19 16:07:07 +08:00
.ConfigurePlugins(a =>
{
a.Add<TcpCloseMonitor>();
a.Add<TcpMonitor>();
a.Add<ServerMonitor>();
});
});
context.Services.AddUdpSession(config =>
{
config.SetBindIPHost(int.Parse(configuration["UDP:ClientPort"] ?? "10500"))
.ConfigurePlugins(a =>
{
a.Add<UdpMonitor>();
a.Add<ServerMonitor>();
})
.UseBroadcast()
.SetUdpDataHandlingAdapter(() => new NormalUdpDataHandlingAdapter());
});
}
/// <summary>
/// Configures the cap.
/// </summary>
/// <param name="context">The context.</param>
/// <param name="configuration">The configuration.</param>
public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration)
2025-04-09 14:31:48 +08:00
{
2024-12-19 16:07:07 +08:00
context.Services.AddCap(x =>
{
x.DefaultGroupName = ProtocolConst.SubscriberGroup;
var connectionStr = configuration.GetConnectionString(CollectBusDbProperties.MongoDbConnectionStringName);
x.UseMongoDB(connectionStr); //MongoDB 4.0+ cluster
var kafka = configuration.GetConnectionString("Kafka");
x.UseKafka(option =>
{
option.Servers = kafka;
2025-03-26 17:18:20 +08:00
if (!Convert.ToBoolean(configuration["Kafka:EnableAuthorization"])) return;
option.MainConfig.Add("security.protocol", configuration["Kafka:SecurityProtocol"]);
option.MainConfig.Add("sasl.mechanism", configuration["Kafka:SaslMechanism"]);
option.MainConfig.Add("sasl.username", configuration["Kafka:SaslUserName"]);
option.MainConfig.Add("sasl.password", configuration["Kafka:SaslPassword"]);
2024-12-19 16:07:07 +08:00
});
x.UseDashboard();
x.FailedRetryInterval = 10;
x.FailedRetryCount = 5;
});
}
/// <summary>
/// Configures the mass transit.
/// </summary>
/// <param name="context">The context.</param>
/// <param name="configuration">The configuration.</param>
/// <summary>
/// Configures the mass transit.
/// </summary>
2024-12-19 16:07:07 +08:00
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
2025-04-09 14:31:48 +08:00
{
2025-04-02 09:42:04 +08:00
var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
var producerConfig = new ProducerConfig();
2025-04-01 22:50:34 +08:00
2025-04-07 17:35:37 +08:00
context.Services
2025-04-02 09:42:04 +08:00
.AddMassTransit(x =>
2024-12-19 16:07:07 +08:00
{
2025-04-02 09:42:04 +08:00
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
2024-12-19 16:07:07 +08:00
x.AddConfigureEndpointsCallback((c, name, cfg) =>
{
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox(c);
});
x.AddRider(rider =>
{
rider.AddConsumer<IssuedConsumer>();
rider.AddConsumer<ReceivedHeartbeatConsumer>();
rider.AddConsumer<ReceivedLoginConsumer>();
rider.AddConsumer<ReceivedConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.SetConcurrencyLimit(10));
2025-04-07 17:35:37 +08:00
});
2025-03-24 20:54:31 +08:00
rider.AddConsumer<ScheduledMeterReadingConsumer>();
2025-04-02 09:42:04 +08:00
rider.AddProducer<string, MessageReceivedLogin>(ProtocolConst.SubscriberLoginReceivedEventName);
rider.AddProducer<string, ReceivedHeartbeatConsumer>(ProtocolConst.SubscriberHeartbeatReceivedEventName);
2025-04-07 17:35:37 +08:00
2024-12-19 16:07:07 +08:00
rider.UsingKafka((c, cfg) =>
{
cfg.Host(configuration.GetConnectionString("Kafka"));
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberHeartbeatReceivedEventName, consumerConfig, configurator =>
2024-12-19 16:07:07 +08:00
{
2025-04-01 22:50:34 +08:00
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
2024-12-19 16:07:07 +08:00
configurator.ConfigureConsumer<ReceivedHeartbeatConsumer>(c);
});
cfg.TopicEndpoint<MessageReceivedLogin>(ProtocolConst.SubscriberLoginReceivedEventName, consumerConfig, configurator =>
2024-12-19 16:07:07 +08:00
{
configurator.ConfigureConsumer<ReceivedLoginConsumer>(c);
2025-04-01 22:50:34 +08:00
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
2024-12-19 16:07:07 +08:00
});
2025-04-02 09:42:04 +08:00
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
2024-12-19 16:07:07 +08:00
{
configurator.ConfigureConsumer<ReceivedConsumer>(c);
2025-04-01 22:50:34 +08:00
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
2024-12-19 16:07:07 +08:00
});
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
2024-12-19 16:07:07 +08:00
{
configurator.ConfigureConsumer<IssuedConsumer>(c);
2025-04-01 22:50:34 +08:00
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
2024-12-19 16:07:07 +08:00
});
2025-03-24 20:54:31 +08:00
2025-04-02 09:42:04 +08:00
cfg.TopicEndpoint<ScheduledMeterReadingIssuedEventMessage>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, consumerConfig, configurator =>
2025-04-01 22:50:34 +08:00
{
configurator.ConfigureConsumer<ScheduledMeterReadingConsumer>(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
2024-12-19 16:07:07 +08:00
});
});
});
});
}
2025-04-09 14:31:48 +08:00
/// <summary>
/// 配置Kafka主题
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
public void ConfigureKafkaTopic(ServiceConfigurationContext context, IConfiguration configuration)
{
var adminClient = new AdminClientBuilder(new AdminClientConfig
{
BootstrapServers = configuration.GetConnectionString("Kafka")
}).Build();
try
{
string serverTagName = configuration.GetSection("ServerTagName").Value!;
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName);
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
foreach (var item in topics)
{
topicSpecifications.Add(new TopicSpecification
{
Name = item,
NumPartitions = 3,
ReplicationFactor = 1
});
}
adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
throw;
}
}
}
2024-12-19 16:07:07 +08:00
}
}