365 lines
16 KiB
C#
365 lines
16 KiB
C#
using System.Text;
|
|
using Hangfire;
|
|
using Hangfire.Redis.StackExchange;
|
|
using JiShe.CollectBus.Host.Hangfire;
|
|
using JiShe.CollectBus.Host.Swaggers;
|
|
using MassTransit;
|
|
using Microsoft.AspNetCore.Authentication.JwtBearer;
|
|
using Microsoft.AspNetCore.DataProtection;
|
|
using Microsoft.IdentityModel.Tokens;
|
|
using Microsoft.OpenApi.Models;
|
|
using StackExchange.Redis;
|
|
using Volo.Abp.AspNetCore.Auditing;
|
|
using Volo.Abp.Auditing;
|
|
using Volo.Abp.BackgroundJobs;
|
|
using Volo.Abp.Caching;
|
|
using Volo.Abp.Modularity;
|
|
using TouchSocket.Core;
|
|
using TouchSocket.Sockets;
|
|
using JiShe.CollectBus.Plugins;
|
|
using JiShe.CollectBus.Consumers;
|
|
using JiShe.CollectBus.Protocol.Contracts;
|
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
|
using Confluent.Kafka;
|
|
using MassTransit.SqlTransport.Topology;
|
|
using Confluent.Kafka.Admin;
|
|
using JiShe.CollectBus.Common.Consts;
|
|
|
|
|
|
namespace JiShe.CollectBus.Host
|
|
{
|
|
public partial class CollectBusHostModule
|
|
{
|
|
/// <summary>
|
|
/// Configures the hangfire.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
private void ConfigureHangfire(ServiceConfigurationContext context)
|
|
{
|
|
var redisStorageOptions = new RedisStorageOptions()
|
|
{
|
|
Db = context.Services.GetConfiguration().GetValue<int>("Redis:HangfireDB")
|
|
};
|
|
|
|
Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = true; });
|
|
|
|
context.Services.AddHangfire(config =>
|
|
{
|
|
config.UseRedisStorage(
|
|
context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
|
|
.WithJobExpirationTimeout(TimeSpan.FromDays(7));
|
|
var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
|
|
const int Attempts = 3; // 重试次数
|
|
config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
|
|
//config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
|
|
config.UseFilter(new JobRetryLastFilter(Attempts));
|
|
});
|
|
context.Services.AddHangfireServer();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Configures the JWT authentication.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
/// <param name="configuration">The configuration.</param>
|
|
private void ConfigureJwtAuthentication(ServiceConfigurationContext context, IConfiguration configuration)
|
|
{
|
|
context.Services.AddAuthentication(options =>
|
|
{
|
|
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
|
|
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
|
|
})
|
|
.AddJwtBearer(options =>
|
|
{
|
|
options.TokenValidationParameters =
|
|
new TokenValidationParameters()
|
|
{
|
|
// 是否开启签名认证
|
|
ValidateIssuerSigningKey = true,
|
|
ValidateIssuer = true,
|
|
ValidateAudience = true,
|
|
ValidateLifetime = true,
|
|
ClockSkew = TimeSpan.Zero,
|
|
ValidIssuer = configuration["Jwt:Issuer"],
|
|
ValidAudience = configuration["Jwt:Audience"],
|
|
IssuerSigningKey =
|
|
new SymmetricSecurityKey(
|
|
Encoding.ASCII.GetBytes(configuration["Jwt:SecurityKey"]))
|
|
};
|
|
|
|
options.Events = new JwtBearerEvents
|
|
{
|
|
OnMessageReceived = currentContext =>
|
|
{
|
|
var path = currentContext.HttpContext.Request.Path;
|
|
if (path.StartsWithSegments("/login"))
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
var accessToken = string.Empty;
|
|
if (currentContext.HttpContext.Request.Headers.ContainsKey("Authorization"))
|
|
{
|
|
accessToken = currentContext.HttpContext.Request.Headers["Authorization"];
|
|
if (!string.IsNullOrWhiteSpace(accessToken))
|
|
{
|
|
accessToken = accessToken.Split(" ").LastOrDefault();
|
|
}
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(accessToken))
|
|
{
|
|
accessToken = currentContext.Request.Query["access_token"].FirstOrDefault();
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(accessToken))
|
|
{
|
|
accessToken = currentContext.Request.Cookies[@CollectBusHostConst.DefaultCookieName];
|
|
}
|
|
|
|
currentContext.Token = accessToken;
|
|
currentContext.Request.Headers.Remove("Authorization");
|
|
currentContext.Request.Headers.Add("Authorization", $"Bearer {accessToken}");
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
};
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Configures the cache.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
private void ConfigureCache(ServiceConfigurationContext context)
|
|
{
|
|
Configure<AbpDistributedCacheOptions>(
|
|
options => { options.KeyPrefix = "CollectBus:"; });
|
|
var configuration = context.Services.GetConfiguration();
|
|
var redis = ConnectionMultiplexer.Connect($"{configuration.GetValue<string>("Redis:Configuration")},defaultdatabase={configuration.GetValue<int>("Redis:DefaultDB")}");
|
|
context.Services
|
|
.AddDataProtection()
|
|
.PersistKeysToStackExchangeRedis(redis, "CollectBus-Protection-Keys");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Configures the swagger services.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
/// <param name="configuration">The configuration.</param>
|
|
private void ConfigureSwaggerServices(ServiceConfigurationContext context, IConfiguration configuration)
|
|
{
|
|
context.Services.AddSwaggerGen(
|
|
options =>
|
|
{
|
|
configuration.GetSection("SwaggerConfig").Get<List<SwaggerConfig>>()?.ForEach(group =>
|
|
{
|
|
options.SwaggerDoc(group.GroupName,
|
|
new OpenApiInfo { Title = group.Title, Version = group.Version });
|
|
});
|
|
|
|
options.DocInclusionPredicate((docName, apiDes) =>
|
|
{
|
|
if (docName == "Basic" && string.IsNullOrWhiteSpace(apiDes.GroupName)) return true;
|
|
return docName == apiDes.GroupName;
|
|
});
|
|
|
|
options.EnableAnnotations();
|
|
options.DocumentFilter<HiddenAbpDefaultApiFilter>();
|
|
options.SchemaFilter<EnumSchemaFilter>();
|
|
var xmlPaths = Directory.GetFiles(AppContext.BaseDirectory, "*.xml")
|
|
.Where(a => a.EndsWith("Application.xml") ||
|
|
a.EndsWith("Application.Contracts.xml") ||
|
|
a.EndsWith("httpApi.xml") ||
|
|
a.EndsWith("Host.xml"))
|
|
.Distinct()
|
|
.ToList();
|
|
foreach (var xml in xmlPaths) options.IncludeXmlComments(xml, true);
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Configures the audit log.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
private void ConfigureAuditLog(ServiceConfigurationContext context)
|
|
{
|
|
Configure<AbpAuditingOptions>
|
|
(
|
|
options =>
|
|
{
|
|
options.IsEnabled = true;
|
|
options.EntityHistorySelectors.AddAllEntities();
|
|
options.ApplicationName = "JiShe.CollectBus";
|
|
}
|
|
);
|
|
|
|
Configure<AbpAspNetCoreAuditingOptions>(
|
|
options =>
|
|
{
|
|
options.IgnoredUrls.Add("/AuditLogs/page");
|
|
options.IgnoredUrls.Add("/hangfire/stats");
|
|
options.IgnoredUrls.Add("/hangfire/recurring/trigger");
|
|
options.IgnoredUrls.Add("/cap");
|
|
options.IgnoredUrls.Add("/");
|
|
});
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Configures the custom.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
/// <param name="configuration">The configuration.</param>
|
|
private void ConfigureCustom(ServiceConfigurationContext context, IConfiguration configuration)
|
|
{
|
|
context.Services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
|
|
context.Services.AddHealthChecks();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Configures the network.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
/// <param name="configuration">The configuration.</param>
|
|
public void ConfigureNetwork(ServiceConfigurationContext context, IConfiguration configuration)
|
|
{
|
|
context.Services.AddTcpService(config =>
|
|
{
|
|
config.SetListenIPHosts(int.Parse(configuration["TCP:ClientPort"] ?? "10500"))
|
|
//.SetTcpDataHandlingAdapter(()=>new StandardFixedHeaderDataHandlingAdapter())
|
|
//.SetGetDefaultNewId(() => Guid.NewGuid().ToString())//定义ClientId的生成策略
|
|
.ConfigurePlugins(a =>
|
|
{
|
|
a.Add<TcpCloseMonitor>();
|
|
a.Add<TcpMonitor>();
|
|
a.Add<ServerMonitor>();
|
|
});
|
|
});
|
|
|
|
context.Services.AddUdpSession(config =>
|
|
{
|
|
config.SetBindIPHost(int.Parse(configuration["UDP:ClientPort"] ?? "10500"))
|
|
.ConfigurePlugins(a =>
|
|
{
|
|
a.Add<UdpMonitor>();
|
|
a.Add<ServerMonitor>();
|
|
})
|
|
.UseBroadcast()
|
|
.SetUdpDataHandlingAdapter(() => new NormalUdpDataHandlingAdapter());
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Configures the cap.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
/// <param name="configuration">The configuration.</param>
|
|
public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration)
|
|
{
|
|
context.Services.AddCap(x =>
|
|
{
|
|
x.DefaultGroupName = ProtocolConst.SubscriberGroup;
|
|
var connectionStr = configuration.GetConnectionString(CollectBusDbProperties.MongoDbConnectionStringName);
|
|
x.UseMongoDB(connectionStr); //MongoDB 4.0+ cluster
|
|
var kafka = configuration.GetConnectionString("Kafka");
|
|
x.UseKafka(option =>
|
|
{
|
|
option.Servers = kafka;
|
|
if (!Convert.ToBoolean(configuration["Kafka:EnableAuthorization"])) return;
|
|
option.MainConfig.Add("security.protocol", configuration["Kafka:SecurityProtocol"]);
|
|
option.MainConfig.Add("sasl.mechanism", configuration["Kafka:SaslMechanism"]);
|
|
option.MainConfig.Add("sasl.username", configuration["Kafka:SaslUserName"]);
|
|
option.MainConfig.Add("sasl.password", configuration["Kafka:SaslPassword"]);
|
|
});
|
|
|
|
x.UseDashboard();
|
|
x.FailedRetryInterval = 10;
|
|
x.FailedRetryCount = 5;
|
|
});
|
|
|
|
}
|
|
|
|
/// <summary>
|
|
/// Configures the mass transit.
|
|
/// </summary>
|
|
/// <param name="context">The context.</param>
|
|
/// <param name="configuration">The configuration.</param>
|
|
/// <summary>
|
|
/// Configures the mass transit.
|
|
/// </summary>
|
|
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
|
|
{
|
|
var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
|
|
var producerConfig = new ProducerConfig();
|
|
|
|
context.Services
|
|
.AddMassTransit(x =>
|
|
{
|
|
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
|
|
|
|
x.AddConfigureEndpointsCallback((c, name, cfg) =>
|
|
{
|
|
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
|
|
cfg.UseMessageRetry(r => r.Immediate(5));
|
|
cfg.UseInMemoryOutbox(c);
|
|
});
|
|
|
|
x.AddRider(rider =>
|
|
{
|
|
rider.AddConsumer<IssuedConsumer>();
|
|
rider.AddConsumer<ReceivedHeartbeatConsumer>();
|
|
rider.AddConsumer<ReceivedLoginConsumer>();
|
|
rider.AddConsumer<ReceivedConsumer>(cfg =>
|
|
{
|
|
cfg.Options<BatchOptions>(options => options
|
|
.SetMessageLimit(100)
|
|
.SetTimeLimit(s: 1)
|
|
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
|
|
.SetConcurrencyLimit(10));
|
|
});
|
|
rider.AddConsumer<ScheduledMeterReadingConsumer>();
|
|
|
|
rider.AddProducer<string, MessageReceivedLogin>(ProtocolConst.SubscriberLoginReceivedEventName);
|
|
rider.AddProducer<string, ReceivedHeartbeatConsumer>(ProtocolConst.SubscriberHeartbeatReceivedEventName);
|
|
|
|
rider.UsingKafka((c, cfg) =>
|
|
{
|
|
cfg.Host(configuration.GetConnectionString("Kafka"));
|
|
|
|
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberHeartbeatReceivedEventName, consumerConfig, configurator =>
|
|
{
|
|
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
|
configurator.ConfigureConsumer<ReceivedHeartbeatConsumer>(c);
|
|
});
|
|
|
|
cfg.TopicEndpoint<MessageReceivedLogin>(ProtocolConst.SubscriberLoginReceivedEventName, consumerConfig, configurator =>
|
|
{
|
|
configurator.ConfigureConsumer<ReceivedLoginConsumer>(c);
|
|
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
|
});
|
|
|
|
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
|
|
{
|
|
configurator.ConfigureConsumer<ReceivedConsumer>(c);
|
|
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
|
});
|
|
|
|
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
|
|
{
|
|
configurator.ConfigureConsumer<IssuedConsumer>(c);
|
|
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
|
});
|
|
|
|
cfg.TopicEndpoint<ScheduledMeterReadingIssuedEventMessage>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, consumerConfig, configurator =>
|
|
{
|
|
configurator.ConfigureConsumer<ScheduledMeterReadingConsumer>(c);
|
|
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
}
|
|
} |