Compare commits
No commits in common. "11d3fcf162ce187880a8470ad3646d37f50cc548" and "72d1b9f623ee392decfa8daafea9bd1f0367ee72" have entirely different histories.
11d3fcf162
...
72d1b9f623
@ -37,7 +37,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvi
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{82E4562A-3A7F-4372-8D42-8AE41BA56C04}"
|
||||||
EndProject
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
@ -109,10 +109,10 @@ Global
|
|||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
@ -134,7 +134,7 @@ Global
|
|||||||
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
||||||
|
|||||||
@ -26,7 +26,6 @@
|
|||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -1,17 +1,16 @@
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Subscribers
|
namespace JiShe.CollectBus.Subscribers
|
||||||
{
|
{
|
||||||
public interface ISubscriberAppService : IApplicationService
|
public interface ISubscriberAppService : IApplicationService
|
||||||
{
|
{
|
||||||
Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
|
Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||||
Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
|
Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
|
||||||
Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage);
|
Task ReceivedEvent(MessageReceived receivedMessage);
|
||||||
Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
|
Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
|
||||||
Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
|
Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp.Application.Services;
|
using Volo.Abp.Application.Services;
|
||||||
@ -20,19 +19,19 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集电表数据下行消息消费订阅
|
/// 1分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5分钟采集电表数据下行消息消费订阅
|
/// 5分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 15分钟采集电表数据下行消息消费订阅
|
/// 15分钟采集电表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region 水表消息采集
|
#region 水表消息采集
|
||||||
@ -40,7 +39,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 1分钟采集水表数据下行消息消费订阅
|
/// 1分钟采集水表数据下行消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,11 +12,8 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Threading.Tasks;
|
|
||||||
using JiShe.CollectBus.Cassandra;
|
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
using Volo.Abp.Application;
|
using Volo.Abp.Application;
|
||||||
using Volo.Abp.Autofac;
|
|
||||||
using Volo.Abp.AutoMapper;
|
using Volo.Abp.AutoMapper;
|
||||||
using Volo.Abp.BackgroundWorkers;
|
using Volo.Abp.BackgroundWorkers;
|
||||||
using Volo.Abp.BackgroundWorkers.Hangfire;
|
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
@ -30,13 +27,11 @@ namespace JiShe.CollectBus;
|
|||||||
typeof(CollectBusApplicationContractsModule),
|
typeof(CollectBusApplicationContractsModule),
|
||||||
typeof(AbpDddApplicationModule),
|
typeof(AbpDddApplicationModule),
|
||||||
typeof(AbpAutoMapperModule),
|
typeof(AbpAutoMapperModule),
|
||||||
typeof(AbpAutofacModule),
|
|
||||||
typeof(AbpBackgroundWorkersHangfireModule),
|
typeof(AbpBackgroundWorkersHangfireModule),
|
||||||
typeof(CollectBusFreeRedisModule),
|
typeof(CollectBusFreeRedisModule),
|
||||||
typeof(CollectBusFreeSqlModule),
|
typeof(CollectBusFreeSqlModule),
|
||||||
typeof(CollectBusKafkaModule),
|
typeof(CollectBusKafkaModule),
|
||||||
typeof(CollectBusIoTDBModule),
|
typeof(CollectBusIoTDBModule)
|
||||||
typeof(CollectBusCassandraModule)
|
|
||||||
)]
|
)]
|
||||||
public class CollectBusApplicationModule : AbpModule
|
public class CollectBusApplicationModule : AbpModule
|
||||||
{
|
{
|
||||||
@ -51,20 +46,20 @@ public class CollectBusApplicationModule : AbpModule
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public override async Task OnApplicationInitializationAsync(
|
public override void OnApplicationInitialization(
|
||||||
ApplicationInitializationContext context)
|
ApplicationInitializationContext context)
|
||||||
{
|
{
|
||||||
var assembly = Assembly.GetExecutingAssembly();
|
var assembly = Assembly.GetExecutingAssembly();
|
||||||
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
|
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
|
||||||
foreach (var type in types)
|
foreach (var type in types)
|
||||||
{
|
{
|
||||||
await context.AddBackgroundWorkerAsync(type);
|
context.AddBackgroundWorkerAsync(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
//默认初始化表计信息
|
//默认初始化表计信息
|
||||||
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||||
await dbContext.InitAmmeterCacheData();
|
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
//await dbContext.InitWatermeterCacheData();
|
//dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
|
|
||||||
//初始化主题信息
|
//初始化主题信息
|
||||||
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
|
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
|
||||||
@ -75,7 +70,7 @@ public class CollectBusApplicationModule : AbpModule
|
|||||||
|
|
||||||
foreach (var item in topics)
|
foreach (var item in topics)
|
||||||
{
|
{
|
||||||
await kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue<int>(CommonConst.NumPartitions), configuration.GetValue<short>(CommonConst.KafkaReplicationFactor));
|
kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue<int>(CommonConst.NumPartitions), configuration.GetValue<short>(CommonConst.KafkaReplicationFactor));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -23,11 +23,9 @@
|
|||||||
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
|
||||||
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
<PackageReference Include="TouchSocket" Version="3.0.19" />
|
||||||
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
|
||||||
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj" />
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||||
|
|||||||
@ -7,12 +7,10 @@ using DotNetCore.CAP;
|
|||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using JiShe.CollectBus.Enums;
|
using JiShe.CollectBus.Enums;
|
||||||
using JiShe.CollectBus.Interceptors;
|
using JiShe.CollectBus.Interceptors;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@ -28,7 +26,7 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
{
|
{
|
||||||
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
|
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
|
||||||
{
|
{
|
||||||
private readonly IProducerService _producerService;
|
private readonly ICapPublisher _producerBus;
|
||||||
private readonly ILogger<TcpMonitor> _logger;
|
private readonly ILogger<TcpMonitor> _logger;
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
|
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
|
||||||
@ -36,16 +34,16 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
///
|
///
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="producerService"></param>
|
/// <param name="producerBus"></param>
|
||||||
/// <param name="logger"></param>
|
/// <param name="logger"></param>
|
||||||
/// <param name="deviceRepository"></param>
|
/// <param name="deviceRepository"></param>
|
||||||
/// <param name="ammeterInfoCache"></param>
|
/// <param name="ammeterInfoCache"></param>
|
||||||
public TcpMonitor(IProducerService producerService,
|
public TcpMonitor(ICapPublisher producerBus,
|
||||||
ILogger<TcpMonitor> logger,
|
ILogger<TcpMonitor> logger,
|
||||||
IRepository<Device, Guid> deviceRepository,
|
IRepository<Device, Guid> deviceRepository,
|
||||||
IDistributedCache<AmmeterInfo> ammeterInfoCache)
|
IDistributedCache<AmmeterInfo> ammeterInfoCache)
|
||||||
{
|
{
|
||||||
_producerService = producerService;
|
_producerBus = producerBus;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_deviceRepository = deviceRepository;
|
_deviceRepository = deviceRepository;
|
||||||
_ammeterInfoCache = ammeterInfoCache;
|
_ammeterInfoCache = ammeterInfoCache;
|
||||||
@ -172,7 +170,7 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
DeviceNo = deviceNo,
|
DeviceNo = deviceNo,
|
||||||
MessageId = NewId.NextGuid().ToString()
|
MessageId = NewId.NextGuid().ToString()
|
||||||
};
|
};
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize());
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
|
||||||
|
|
||||||
//await _producerBus.Publish( messageReceivedLoginEvent);
|
//await _producerBus.Publish( messageReceivedLoginEvent);
|
||||||
}
|
}
|
||||||
@ -219,7 +217,7 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
DeviceNo = deviceNo,
|
DeviceNo = deviceNo,
|
||||||
MessageId = NewId.NextGuid().ToString()
|
MessageId = NewId.NextGuid().ToString()
|
||||||
};
|
};
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize());
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
|
||||||
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
|
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,7 +245,7 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
|
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
|
||||||
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
|
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
||||||
{
|
{
|
||||||
ClientId = client.Id,
|
ClientId = client.Id,
|
||||||
ClientIp = client.IP,
|
ClientIp = client.IP,
|
||||||
@ -255,7 +253,7 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
MessageHexString = messageHexString,
|
MessageHexString = messageHexString,
|
||||||
DeviceNo = deviceNo,
|
DeviceNo = deviceNo,
|
||||||
MessageId = NewId.NextGuid().ToString()
|
MessageId = NewId.NextGuid().ToString()
|
||||||
}.Serialize());
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -215,12 +215,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
return aa == null;
|
return aa == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(["test-topic"])]
|
[KafkaSubscribe(["test-topic1"])]
|
||||||
|
|
||||||
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
|
public async Task<ISubscribeAck> KafkaSubscribeAsync() // TestSubscribe obj
|
||||||
{
|
{
|
||||||
|
var obj=string.Empty;
|
||||||
_logger.LogWarning($"收到订阅消息: {obj}");
|
_logger.LogWarning($"收到订阅消息: {obj}");
|
||||||
return SubscribeAck.Success();
|
return SubscribeAck.Success();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class TestSubscribe
|
||||||
|
{
|
||||||
|
public string Topic { get; set; }
|
||||||
|
public int Val { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,51 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Apache.IoTDB.DataStructure;
|
|
||||||
using Apache.IoTDB;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
using JiShe.CollectBus.Ammeters;
|
|
||||||
using JiShe.CollectBus.FreeSql;
|
|
||||||
using JiShe.CollectBus.IoTDBProvider;
|
|
||||||
using JiShe.CollectBus.IotSystems.PrepayModel;
|
|
||||||
using Microsoft.AspNetCore.Authorization;
|
|
||||||
using Microsoft.AspNetCore.Mvc;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using JiShe.CollectBus.IoTDBProvider.Context;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using JiShe.CollectBus.IotSystems.AFNEntity;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using JiShe.CollectBus.Cassandra;
|
|
||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
|
||||||
using Volo.Abp.Application.Services;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Samples;
|
|
||||||
|
|
||||||
[AllowAnonymous]
|
|
||||||
public class TestAppService : CollectBusAppService, IApplicationService
|
|
||||||
{
|
|
||||||
private readonly ILogger<TestAppService> _logger;
|
|
||||||
private readonly ICassandraRepository<MessageIssued,string> _messageIssuedCassandraRepository;
|
|
||||||
public TestAppService(
|
|
||||||
ILogger<TestAppService> logger,
|
|
||||||
ICassandraRepository<MessageIssued, string> messageIssuedCassandraRepository
|
|
||||||
)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
_messageIssuedCassandraRepository = messageIssuedCassandraRepository;
|
|
||||||
}
|
|
||||||
public async Task AddMessage()
|
|
||||||
{
|
|
||||||
await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued
|
|
||||||
{
|
|
||||||
ClientId = Guid.NewGuid().ToString(),
|
|
||||||
Message = Array.Empty<byte>(),
|
|
||||||
DeviceNo = "123321312",
|
|
||||||
MessageId = Guid.NewGuid().ToString(),
|
|
||||||
Type = IssuedEventType.Data
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -29,6 +29,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
|
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
|
||||||
{
|
{
|
||||||
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
||||||
|
private readonly ICapPublisher _producerBus;
|
||||||
private readonly IIoTDBProvider _dbProvider;
|
private readonly IIoTDBProvider _dbProvider;
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
|
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
|
||||||
private readonly IProducerService _producerService;
|
private readonly IProducerService _producerService;
|
||||||
@ -36,10 +37,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
public BasicScheduledMeterReadingService(
|
public BasicScheduledMeterReadingService(
|
||||||
ILogger<BasicScheduledMeterReadingService> logger,
|
ILogger<BasicScheduledMeterReadingService> logger,
|
||||||
|
ICapPublisher producerBus,
|
||||||
IMeterReadingRecordRepository meterReadingRecordRepository,
|
IMeterReadingRecordRepository meterReadingRecordRepository,
|
||||||
IProducerService producerService,
|
IProducerService producerService,
|
||||||
IIoTDBProvider dbProvider)
|
IIoTDBProvider dbProvider)
|
||||||
{
|
{
|
||||||
|
_producerBus = producerBus;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_dbProvider = dbProvider;
|
_dbProvider = dbProvider;
|
||||||
_meterReadingRecordRepository = meterReadingRecordRepository;
|
_meterReadingRecordRepository = meterReadingRecordRepository;
|
||||||
@ -378,7 +381,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize());
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
|
||||||
//_= _producerBus.Publish(tempMsg);
|
//_= _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
|
|
||||||
@ -442,7 +445,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize());
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -507,7 +510,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
|
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -802,7 +805,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
|
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
|
||||||
|
|
||||||
await _producerService.ProduceAsync(topicName, partition, taskRecord.Serialize());
|
await _producerService.ProduceAsync(topicName, partition, taskRecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
|
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
|
||||||
@ -843,7 +846,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
|
|
||||||
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
|
|||||||
@ -36,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
string serverTagName = string.Empty;
|
string serverTagName = string.Empty;
|
||||||
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
|
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
|
||||||
IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, meterReadingRecordRepository, producerService,dbProvider)
|
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
|
||||||
{
|
{
|
||||||
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
|
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,8 +6,6 @@ using JiShe.CollectBus.IoTDBProvider;
|
|||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Models;
|
using JiShe.CollectBus.Protocol.Contracts.Models;
|
||||||
@ -22,7 +20,7 @@ using Volo.Abp.Domain.Repositories;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Subscribers
|
namespace JiShe.CollectBus.Subscribers
|
||||||
{
|
{
|
||||||
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe
|
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe
|
||||||
{
|
{
|
||||||
private readonly ILogger<SubscriberAppService> _logger;
|
private readonly ILogger<SubscriberAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
@ -65,10 +63,9 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
_dbProvider = dbProvider;
|
_dbProvider = dbProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
|
public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
|
||||||
switch (issuedEventMessage.Type)
|
switch (issuedEventMessage.Type)
|
||||||
{
|
{
|
||||||
case IssuedEventType.Heartbeat:
|
case IssuedEventType.Heartbeat:
|
||||||
@ -79,7 +76,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
loginEntity.AckTime = Clock.Now;
|
loginEntity.AckTime = Clock.Now;
|
||||||
loginEntity.IsAck = true;
|
loginEntity.IsAck = true;
|
||||||
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||||
isAck = true;
|
|
||||||
break;
|
break;
|
||||||
case IssuedEventType.Data:
|
case IssuedEventType.Data:
|
||||||
break;
|
break;
|
||||||
@ -94,13 +90,11 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//}
|
//}
|
||||||
|
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
|
public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
|
||||||
{
|
{
|
||||||
bool isAck = false;
|
|
||||||
switch (issuedEventMessage.Type)
|
switch (issuedEventMessage.Type)
|
||||||
{
|
{
|
||||||
case IssuedEventType.Heartbeat:
|
case IssuedEventType.Heartbeat:
|
||||||
@ -109,7 +103,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
heartbeatEntity.AckTime = Clock.Now;
|
heartbeatEntity.AckTime = Clock.Now;
|
||||||
heartbeatEntity.IsAck = true;
|
heartbeatEntity.IsAck = true;
|
||||||
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||||
isAck = true;
|
|
||||||
break;
|
break;
|
||||||
case IssuedEventType.Data:
|
case IssuedEventType.Data:
|
||||||
break;
|
break;
|
||||||
@ -124,11 +117,10 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//}
|
//}
|
||||||
|
|
||||||
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
|
||||||
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
||||||
public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
|
public async Task ReceivedEvent(MessageReceived receivedMessage)
|
||||||
{
|
{
|
||||||
var currentTime = Clock.Now;
|
var currentTime = Clock.Now;
|
||||||
|
|
||||||
@ -145,13 +137,13 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
if(fN == null)
|
if(fN == null)
|
||||||
{
|
{
|
||||||
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
||||||
return SubscribeAck.Success();
|
return;
|
||||||
}
|
}
|
||||||
var tb3761FN = fN.FnList.FirstOrDefault();
|
var tb3761FN = fN.FnList.FirstOrDefault();
|
||||||
if (tb3761FN == null)
|
if (tb3761FN == null)
|
||||||
{
|
{
|
||||||
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
||||||
return SubscribeAck.Success();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//报文入库
|
//报文入库
|
||||||
@ -177,14 +169,11 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
//todo 查找是否有下发任务
|
//todo 查找是否有下发任务
|
||||||
|
|
||||||
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
|
||||||
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
|
public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
|
||||||
{
|
{
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
if (protocolPlugin == null)
|
if (protocolPlugin == null)
|
||||||
@ -196,11 +185,10 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
|
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
|
||||||
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
|
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
|
||||||
public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
|
public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
|
||||||
{
|
{
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
if (protocolPlugin == null)
|
if (protocolPlugin == null)
|
||||||
@ -212,7 +200,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
await protocolPlugin.LoginAsync(receivedLoginMessage);
|
await protocolPlugin.LoginAsync(receivedLoginMessage);
|
||||||
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
|
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,8 +8,6 @@ using JiShe.CollectBus.IotSystems.Devices;
|
|||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka;
|
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
@ -26,7 +24,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// 定时抄读任务消息消费订阅
|
/// 定时抄读任务消息消费订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Route($"/worker/app/subscriber")]
|
[Route($"/worker/app/subscriber")]
|
||||||
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
|
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe
|
||||||
{
|
{
|
||||||
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
private readonly ILogger<WorkerSubscriberAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
@ -65,8 +63,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/oneminute/issued-event")]
|
[Route("ammeter/oneminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -83,7 +81,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -93,8 +90,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/fiveminute/issued-event")]
|
[Route("ammeter/fiveminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -111,7 +108,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -121,8 +117,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("ammeter/fifteenminute/issued-event")]
|
[Route("ammeter/fifteenminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
|
||||||
try
|
try
|
||||||
@ -141,7 +137,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@ -160,8 +155,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
[Route("watermeter/fifteenminute/issued-event")]
|
[Route("watermeter/fifteenminute/issued-event")]
|
||||||
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
|
||||||
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
@ -177,7 +172,6 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -27,7 +27,7 @@ namespace JiShe.CollectBus.Workers
|
|||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
RecurringJobId = nameof(CreateToBeIssueTaskWorker);
|
RecurringJobId = nameof(CreateToBeIssueTaskWorker);
|
||||||
CronExpression = "* 10 * * * *";
|
CronExpression = $"*/{1} * * * *";
|
||||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
|
|||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
|
RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
|
||||||
CronExpression = "* 15 * * * *";
|
CronExpression = $"*/{15} * * * *";
|
||||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
|
|||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
RecurringJobId = nameof(SubscriberFiveMinuteWorker);
|
RecurringJobId = nameof(SubscriberFiveMinuteWorker);
|
||||||
CronExpression = "* 5 * * * *";
|
CronExpression = $"*/{5} * * * *";
|
||||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
|
|||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
RecurringJobId = nameof(SubscriberOneMinuteWorker);
|
RecurringJobId = nameof(SubscriberOneMinuteWorker);
|
||||||
CronExpression = "* 1 * * * *";
|
CronExpression = $"*/{1} * * * *";
|
||||||
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,64 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra
|
|
||||||
{
|
|
||||||
public class CassandraConfig
|
|
||||||
{
|
|
||||||
public Node[] Nodes { get; set; }
|
|
||||||
public string Username { get; set; }
|
|
||||||
public string Password { get; set; }
|
|
||||||
public string Keyspace { get; set; }
|
|
||||||
public string ConsistencyLevel { get; set; }
|
|
||||||
public Pooling PoolingOptions { get; set; }
|
|
||||||
public Socket SocketOptions { get; set; }
|
|
||||||
public Query QueryOptions { get; set; }
|
|
||||||
|
|
||||||
public ReplicationStrategy ReplicationStrategy { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public class Pooling
|
|
||||||
{
|
|
||||||
public int CoreConnectionsPerHost { get; set; }
|
|
||||||
public int MaxConnectionsPerHost { get; set; }
|
|
||||||
public int MaxRequestsPerConnection { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public class Socket
|
|
||||||
{
|
|
||||||
public int ConnectTimeoutMillis { get; set; }
|
|
||||||
public int ReadTimeoutMillis { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public class Query
|
|
||||||
{
|
|
||||||
public string ConsistencyLevel { get; set; }
|
|
||||||
public string SerialConsistencyLevel { get; set; }
|
|
||||||
public bool DefaultIdempotence { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public class ReplicationStrategy
|
|
||||||
{
|
|
||||||
public string Class { get; set; }
|
|
||||||
public DataCenter[] DataCenters { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public class DataCenter
|
|
||||||
{
|
|
||||||
public string Name { get; set; }
|
|
||||||
public int ReplicationFactor { get; set; }
|
|
||||||
public string Strategy { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public class Node
|
|
||||||
{
|
|
||||||
public string Host { get; set; }
|
|
||||||
public int Port { get; set; }
|
|
||||||
public string DataCenter { get; set; }
|
|
||||||
public string Rack { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,147 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Reflection;
|
|
||||||
using System.Text;
|
|
||||||
using Cassandra;
|
|
||||||
using Cassandra.Mapping;
|
|
||||||
using Cassandra.Data.Linq;
|
|
||||||
using System.ComponentModel.DataAnnotations;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using Volo.Abp.DependencyInjection;
|
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra
|
|
||||||
{
|
|
||||||
public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency
|
|
||||||
{
|
|
||||||
private readonly ILogger<CassandraProvider> _logger;
|
|
||||||
|
|
||||||
public Cluster Instance { get; set; }
|
|
||||||
|
|
||||||
public ISession Session { get; set; }
|
|
||||||
|
|
||||||
public CassandraConfig CassandraConfig { get; set; }
|
|
||||||
/// <summary>
|
|
||||||
///
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="options"></param>
|
|
||||||
/// <param name="logger"></param>
|
|
||||||
public CassandraProvider(
|
|
||||||
IOptions<CassandraConfig> options,
|
|
||||||
ILogger<CassandraProvider> logger)
|
|
||||||
{
|
|
||||||
CassandraConfig = options.Value;
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void InitClusterAndSession()
|
|
||||||
{
|
|
||||||
GetCluster((keyspace) =>
|
|
||||||
{
|
|
||||||
GetSession(keyspace);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public Cluster GetCluster(Action<string?>? callback=null)
|
|
||||||
{
|
|
||||||
var clusterBuilder = Cluster.Builder();
|
|
||||||
|
|
||||||
// 添加多个节点
|
|
||||||
foreach (var node in CassandraConfig.Nodes)
|
|
||||||
{
|
|
||||||
clusterBuilder.AddContactPoint(node.Host)
|
|
||||||
.WithPort(node.Port);
|
|
||||||
}
|
|
||||||
|
|
||||||
clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password);
|
|
||||||
|
|
||||||
// 优化连接池配置
|
|
||||||
var poolingOptions = new PoolingOptions()
|
|
||||||
.SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost)
|
|
||||||
.SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost)
|
|
||||||
.SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection)
|
|
||||||
.SetHeartBeatInterval(30000); // 30秒心跳
|
|
||||||
|
|
||||||
clusterBuilder.WithPoolingOptions(poolingOptions);
|
|
||||||
|
|
||||||
// 优化Socket配置
|
|
||||||
var socketOptions = new SocketOptions()
|
|
||||||
.SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis)
|
|
||||||
.SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis)
|
|
||||||
.SetTcpNoDelay(true) // 启用Nagle算法
|
|
||||||
.SetKeepAlive(true) // 启用TCP保活
|
|
||||||
.SetReceiveBufferSize(32768) // 32KB接收缓冲区
|
|
||||||
.SetSendBufferSize(32768); // 32KB发送缓冲区
|
|
||||||
|
|
||||||
clusterBuilder.WithSocketOptions(socketOptions);
|
|
||||||
|
|
||||||
// 优化查询选项
|
|
||||||
var queryOptions = new QueryOptions()
|
|
||||||
.SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel))
|
|
||||||
.SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel))
|
|
||||||
.SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence)
|
|
||||||
.SetPageSize(5000); // 增加页面大小
|
|
||||||
|
|
||||||
clusterBuilder.WithQueryOptions(queryOptions);
|
|
||||||
|
|
||||||
// 启用压缩
|
|
||||||
clusterBuilder.WithCompression(CompressionType.LZ4);
|
|
||||||
|
|
||||||
// 配置重连策略
|
|
||||||
clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000));
|
|
||||||
Instance = clusterBuilder.Build();
|
|
||||||
callback?.Invoke(null);
|
|
||||||
return Instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ISession GetSession(string? keyspace = null)
|
|
||||||
{
|
|
||||||
if (string.IsNullOrEmpty(keyspace))
|
|
||||||
{
|
|
||||||
keyspace = CassandraConfig.Keyspace;
|
|
||||||
}
|
|
||||||
Session = Instance.Connect();
|
|
||||||
var replication = GetReplicationStrategy();
|
|
||||||
Session.CreateKeyspaceIfNotExists(keyspace, replication);
|
|
||||||
Session.ChangeKeyspace(keyspace);
|
|
||||||
return Session;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Dictionary<string, string> GetReplicationStrategy()
|
|
||||||
{
|
|
||||||
var strategy = CassandraConfig.ReplicationStrategy.Class;
|
|
||||||
var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters;
|
|
||||||
|
|
||||||
switch (strategy)
|
|
||||||
{
|
|
||||||
case "NetworkTopologyStrategy":
|
|
||||||
var networkDic = new Dictionary<string, string> { { "class", "NetworkTopologyStrategy" } };
|
|
||||||
foreach (var dataCenter in dataCenters)
|
|
||||||
{
|
|
||||||
networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString());
|
|
||||||
}
|
|
||||||
return networkDic;
|
|
||||||
case "SimpleStrategy":
|
|
||||||
var dic = new Dictionary<string, string> { { "class", "SimpleStrategy" } };
|
|
||||||
if (dataCenters.Length >= 1)
|
|
||||||
{
|
|
||||||
dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_logger.LogError("SimpleStrategy 不支持多个数据中心!");
|
|
||||||
}
|
|
||||||
return dic;
|
|
||||||
default:
|
|
||||||
throw new ArgumentNullException($"Strategy", "Strategy配置错误!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
Instance.Dispose();
|
|
||||||
Session.Dispose();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,156 +0,0 @@
|
|||||||
using System.Collections.Concurrent;
|
|
||||||
using Cassandra;
|
|
||||||
using Cassandra.Mapping;
|
|
||||||
using Microsoft.Extensions.Caching.Memory;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra
|
|
||||||
{
|
|
||||||
public class CassandraQueryOptimizer
|
|
||||||
{
|
|
||||||
private readonly ISession _session;
|
|
||||||
private readonly ILogger<CassandraQueryOptimizer> _logger;
|
|
||||||
private readonly IMemoryCache _cache;
|
|
||||||
private readonly ConcurrentDictionary<string, PreparedStatement> _preparedStatements;
|
|
||||||
private readonly int _batchSize;
|
|
||||||
private readonly TimeSpan _cacheExpiration;
|
|
||||||
|
|
||||||
public CassandraQueryOptimizer(
|
|
||||||
ISession session,
|
|
||||||
ILogger<CassandraQueryOptimizer> logger,
|
|
||||||
IMemoryCache cache,
|
|
||||||
int batchSize = 100,
|
|
||||||
TimeSpan? cacheExpiration = null)
|
|
||||||
{
|
|
||||||
_session = session;
|
|
||||||
_logger = logger;
|
|
||||||
_cache = cache;
|
|
||||||
_preparedStatements = new ConcurrentDictionary<string, PreparedStatement>();
|
|
||||||
_batchSize = batchSize;
|
|
||||||
_cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<PreparedStatement> GetOrPrepareStatementAsync(string cql)
|
|
||||||
{
|
|
||||||
return _preparedStatements.GetOrAdd(cql, key =>
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var statement = _session.Prepare(key);
|
|
||||||
_logger.LogDebug($"Prepared statement for CQL: {key}");
|
|
||||||
return statement;
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"Failed to prepare statement for CQL: {key}");
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task ExecuteBatchAsync(IEnumerable<BoundStatement> statements)
|
|
||||||
{
|
|
||||||
var batch = new BatchStatement();
|
|
||||||
var count = 0;
|
|
||||||
|
|
||||||
foreach (var statement in statements)
|
|
||||||
{
|
|
||||||
batch.Add(statement);
|
|
||||||
count++;
|
|
||||||
|
|
||||||
if (count >= _batchSize)
|
|
||||||
{
|
|
||||||
await ExecuteBatchAsync(batch);
|
|
||||||
batch = new BatchStatement();
|
|
||||||
count = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (count > 0)
|
|
||||||
{
|
|
||||||
await ExecuteBatchAsync(batch);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task ExecuteBatchAsync(BatchStatement batch)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await _session.ExecuteAsync(batch);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Failed to execute batch statement");
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<T> GetOrSetFromCacheAsync<T>(string cacheKey, Func<Task<T>> getData)
|
|
||||||
{
|
|
||||||
if (_cache.TryGetValue(cacheKey, out T cachedValue))
|
|
||||||
{
|
|
||||||
_logger.LogDebug($"Cache hit for key: {cacheKey}");
|
|
||||||
return cachedValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var data = await getData();
|
|
||||||
_cache.Set(cacheKey, data, _cacheExpiration);
|
|
||||||
_logger.LogDebug($"Cache miss for key: {cacheKey}, data cached");
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<IEnumerable<T>> ExecutePagedQueryAsync<T>(
|
|
||||||
string cql,
|
|
||||||
object[] parameters,
|
|
||||||
int pageSize = 100,
|
|
||||||
string pagingState = null) where T : class
|
|
||||||
{
|
|
||||||
var statement = await GetOrPrepareStatementAsync(cql);
|
|
||||||
var boundStatement = statement.Bind(parameters);
|
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(pagingState))
|
|
||||||
{
|
|
||||||
boundStatement.SetPagingState(Convert.FromBase64String(pagingState));
|
|
||||||
}
|
|
||||||
|
|
||||||
boundStatement.SetPageSize(pageSize);
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var result = await _session.ExecuteAsync(boundStatement);
|
|
||||||
//TODO: RETURN OBJECT
|
|
||||||
throw new NotImplementedException();
|
|
||||||
//result.GetRows()
|
|
||||||
//return result.Select(row => row);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"Failed to execute paged query: {cql}");
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task BulkInsertAsync<T>(IEnumerable<T> items, string tableName)
|
|
||||||
{
|
|
||||||
var mapper = new Mapper(_session);
|
|
||||||
var batch = new List<BoundStatement>();
|
|
||||||
var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})";
|
|
||||||
|
|
||||||
foreach (var chunk in items.Chunk(_batchSize))
|
|
||||||
{
|
|
||||||
var statements = chunk.Select(item =>
|
|
||||||
{
|
|
||||||
var props = typeof(T).GetProperties();
|
|
||||||
var columns = string.Join(", ", props.Select(p => p.Name));
|
|
||||||
var values = string.Join(", ", props.Select(p => "?"));
|
|
||||||
var statement = _session.Prepare(string.Format(cql, columns, values));
|
|
||||||
return statement.Bind(props.Select(p => p.GetValue(item)).ToArray());
|
|
||||||
});
|
|
||||||
|
|
||||||
batch.AddRange(statements);
|
|
||||||
}
|
|
||||||
|
|
||||||
await ExecuteBatchAsync(batch);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,69 +0,0 @@
|
|||||||
using Cassandra;
|
|
||||||
using Cassandra.Mapping;
|
|
||||||
using JiShe.CollectBus.Cassandra.Extensions;
|
|
||||||
using Volo.Abp.Domain.Entities;
|
|
||||||
using Volo.Abp.Domain.Repositories;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra
|
|
||||||
{
|
|
||||||
public class CassandraRepository<TEntity, TKey>
|
|
||||||
: ICassandraRepository<TEntity, TKey>
|
|
||||||
where TEntity : class
|
|
||||||
{
|
|
||||||
|
|
||||||
public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig)
|
|
||||||
{
|
|
||||||
Mapper = new Mapper(cassandraProvider.Session, mappingConfig);
|
|
||||||
cassandraProvider.Session.CreateTable<TEntity>(cassandraProvider.CassandraConfig.Keyspace);
|
|
||||||
}
|
|
||||||
|
|
||||||
public readonly IMapper Mapper;
|
|
||||||
|
|
||||||
public virtual async Task<TEntity> GetAsync(TKey id)
|
|
||||||
{
|
|
||||||
return await Mapper.SingleOrDefaultAsync<TEntity>("WHERE id = ?", id);
|
|
||||||
}
|
|
||||||
|
|
||||||
public virtual async Task<List<TEntity>> GetListAsync()
|
|
||||||
{
|
|
||||||
return (await Mapper.FetchAsync<TEntity>()).ToList();
|
|
||||||
}
|
|
||||||
|
|
||||||
public virtual async Task<TEntity> InsertAsync(TEntity entity)
|
|
||||||
{
|
|
||||||
await Mapper.InsertAsync(entity);
|
|
||||||
return entity;
|
|
||||||
}
|
|
||||||
|
|
||||||
public virtual async Task<TEntity> UpdateAsync(TEntity entity)
|
|
||||||
{
|
|
||||||
await Mapper.UpdateAsync(entity);
|
|
||||||
return entity;
|
|
||||||
}
|
|
||||||
|
|
||||||
public virtual async Task DeleteAsync(TEntity entity)
|
|
||||||
{
|
|
||||||
await Mapper.DeleteAsync(entity);
|
|
||||||
}
|
|
||||||
|
|
||||||
public virtual async Task DeleteAsync(TKey id)
|
|
||||||
{
|
|
||||||
await Mapper.DeleteAsync<TEntity>("WHERE id = ?", id);
|
|
||||||
}
|
|
||||||
|
|
||||||
public virtual async Task<List<TEntity>> GetPagedListAsync(
|
|
||||||
int skipCount,
|
|
||||||
int maxResultCount,
|
|
||||||
string sorting)
|
|
||||||
{
|
|
||||||
var cql = $"SELECT * FROM {typeof(TEntity).Name.ToLower()}";
|
|
||||||
if (!string.IsNullOrWhiteSpace(sorting))
|
|
||||||
{
|
|
||||||
cql += $" ORDER BY {sorting}";
|
|
||||||
}
|
|
||||||
cql += $" LIMIT {maxResultCount} OFFSET {skipCount}";
|
|
||||||
|
|
||||||
return (await Mapper.FetchAsync<TEntity>(cql)).ToList();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,29 +0,0 @@
|
|||||||
using Cassandra;
|
|
||||||
using Cassandra.Mapping;
|
|
||||||
using JiShe.CollectBus.Cassandra.Mappers;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Volo.Abp;
|
|
||||||
using Volo.Abp.Autofac;
|
|
||||||
using Volo.Abp.Modularity;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra
|
|
||||||
{
|
|
||||||
[DependsOn(
|
|
||||||
typeof(AbpAutofacModule)
|
|
||||||
)]
|
|
||||||
public class CollectBusCassandraModule : AbpModule
|
|
||||||
{
|
|
||||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
|
||||||
{
|
|
||||||
Configure<CassandraConfig>(context.Services.GetConfiguration().GetSection("Cassandra"));
|
|
||||||
|
|
||||||
context.AddCassandra();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
|
||||||
{
|
|
||||||
context.UseCassandra();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,35 +0,0 @@
|
|||||||
using Autofac.Core;
|
|
||||||
using Cassandra;
|
|
||||||
using Cassandra.Mapping;
|
|
||||||
using JiShe.CollectBus.Cassandra;
|
|
||||||
using JiShe.CollectBus.Cassandra.Mappers;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using System.Reflection;
|
|
||||||
using Volo.Abp;
|
|
||||||
using Volo.Abp.Modularity;
|
|
||||||
|
|
||||||
// ReSharper disable once CheckNamespace
|
|
||||||
namespace Microsoft.Extensions.DependencyInjection
|
|
||||||
{
|
|
||||||
public static class ApplicationInitializationContextExtensions
|
|
||||||
{
|
|
||||||
public static void UseCassandra(this ApplicationInitializationContext context)
|
|
||||||
{
|
|
||||||
var service = context.ServiceProvider;
|
|
||||||
var cassandraProvider = service.GetRequiredService<ICassandraProvider>();
|
|
||||||
cassandraProvider.InitClusterAndSession();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ServiceCollectionExtensions
|
|
||||||
{
|
|
||||||
public static void AddCassandra(this ServiceConfigurationContext context)
|
|
||||||
{
|
|
||||||
context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
|
|
||||||
|
|
||||||
var mappingConfig = new MappingConfiguration()
|
|
||||||
.Define(new CollectBusMapping());
|
|
||||||
context.Services.AddSingleton(mappingConfig);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,83 +0,0 @@
|
|||||||
using System.Reflection;
|
|
||||||
using System.Text;
|
|
||||||
using Cassandra;
|
|
||||||
using System.ComponentModel.DataAnnotations;
|
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
|
||||||
using Cassandra.Mapping;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra.Extensions
|
|
||||||
{
|
|
||||||
public static class SessionExtension
|
|
||||||
{
|
|
||||||
public static void CreateTable<TEntity>(this ISession session,string? defaultKeyspace=null) where TEntity : class
|
|
||||||
{
|
|
||||||
var type = typeof(TEntity);
|
|
||||||
var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>();
|
|
||||||
var tableName = tableAttribute?.Name ?? type.Name.ToLower();
|
|
||||||
var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
|
|
||||||
|
|
||||||
var properties = type.GetProperties();
|
|
||||||
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<KeyAttribute>() != null);
|
|
||||||
|
|
||||||
if (primaryKey == null)
|
|
||||||
{
|
|
||||||
throw new InvalidOperationException($"No primary key defined for type {type.Name}");
|
|
||||||
}
|
|
||||||
|
|
||||||
var cql = new StringBuilder();
|
|
||||||
cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} (");
|
|
||||||
|
|
||||||
foreach (var prop in properties)
|
|
||||||
{
|
|
||||||
var ignoreAttribute = prop.GetCustomAttribute<CassandraIgnoreAttribute>();
|
|
||||||
if (ignoreAttribute != null) continue;
|
|
||||||
var columnName = prop.Name.ToLower();
|
|
||||||
var cqlType = GetCassandraType(prop.PropertyType);
|
|
||||||
|
|
||||||
cql.Append($"{columnName} {cqlType}, ");
|
|
||||||
}
|
|
||||||
cql.Length -= 2; // Remove last comma and space
|
|
||||||
cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))");
|
|
||||||
|
|
||||||
session.Execute(cql.ToString());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static string GetCassandraType(Type type)
|
|
||||||
{
|
|
||||||
// 基础类型处理
|
|
||||||
switch (Type.GetTypeCode(type))
|
|
||||||
{
|
|
||||||
case TypeCode.String: return "text";
|
|
||||||
case TypeCode.Int32: return "int";
|
|
||||||
case TypeCode.Int64: return "bigint";
|
|
||||||
case TypeCode.Boolean: return "boolean";
|
|
||||||
case TypeCode.DateTime: return "timestamp";
|
|
||||||
case TypeCode.Byte: return "tinyint";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type == typeof(Guid)) return "uuid";
|
|
||||||
if (type == typeof(DateTimeOffset)) return "timestamp";
|
|
||||||
if (type == typeof(Byte[])) return "blob";
|
|
||||||
|
|
||||||
// 处理集合类型
|
|
||||||
if (type.IsGenericType)
|
|
||||||
{
|
|
||||||
var genericType = type.GetGenericTypeDefinition();
|
|
||||||
var elementType = type.GetGenericArguments()[0];
|
|
||||||
|
|
||||||
if (genericType == typeof(List<>))
|
|
||||||
return $"list<{GetCassandraType(elementType)}>";
|
|
||||||
if (genericType == typeof(HashSet<>))
|
|
||||||
return $"set<{GetCassandraType(elementType)}>";
|
|
||||||
if (genericType == typeof(Dictionary<,>))
|
|
||||||
{
|
|
||||||
var keyType = type.GetGenericArguments()[0];
|
|
||||||
var valueType = type.GetGenericArguments()[1];
|
|
||||||
return $"map<{GetCassandraType(keyType)}, {GetCassandraType(valueType)}>";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new NotSupportedException($"不支持的类型: {type.Name}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,24 +0,0 @@
|
|||||||
using Cassandra;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra
|
|
||||||
{
|
|
||||||
public interface ICassandraProvider
|
|
||||||
{
|
|
||||||
Cluster Instance { get;}
|
|
||||||
|
|
||||||
ISession Session { get;}
|
|
||||||
|
|
||||||
CassandraConfig CassandraConfig { get; }
|
|
||||||
|
|
||||||
ISession GetSession(string? keyspace = null);
|
|
||||||
|
|
||||||
Cluster GetCluster(Action<string?>? callback = null);
|
|
||||||
|
|
||||||
void InitClusterAndSession();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,20 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.Domain.Entities;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra
|
|
||||||
{
|
|
||||||
public interface ICassandraRepository<TEntity, TKey> where TEntity : class
|
|
||||||
{
|
|
||||||
Task<TEntity> GetAsync(TKey id);
|
|
||||||
Task<List<TEntity>> GetListAsync();
|
|
||||||
Task<TEntity> InsertAsync(TEntity entity);
|
|
||||||
Task<TEntity> UpdateAsync(TEntity entity);
|
|
||||||
Task DeleteAsync(TEntity entity);
|
|
||||||
Task DeleteAsync(TKey id);
|
|
||||||
Task<List<TEntity>> GetPagedListAsync(int skipCount, int maxResultCount, string sorting);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
<Project Sdk="Microsoft.NET.Sdk">
|
|
||||||
|
|
||||||
<PropertyGroup>
|
|
||||||
<TargetFramework>net8.0</TargetFramework>
|
|
||||||
<ImplicitUsings>enable</ImplicitUsings>
|
|
||||||
<Nullable>enable</Nullable>
|
|
||||||
</PropertyGroup>
|
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<PackageReference Include="CassandraCSharpDriver" Version="3.22.0" />
|
|
||||||
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
|
|
||||||
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
|
||||||
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
|
||||||
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
|
|
||||||
</ItemGroup>
|
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
|
||||||
</ItemGroup>
|
|
||||||
|
|
||||||
</Project>
|
|
||||||
@ -1,20 +0,0 @@
|
|||||||
using Cassandra.Mapping;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
|
||||||
using static Cassandra.QueryTrace;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Cassandra.Mappers
|
|
||||||
{
|
|
||||||
public class CollectBusMapping: Mappings
|
|
||||||
{
|
|
||||||
public CollectBusMapping()
|
|
||||||
{
|
|
||||||
For<MessageIssued>()
|
|
||||||
.Column(e => e.Type, cm => cm.WithName("type").WithDbType<int>());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.EventBus;
|
|
||||||
using Volo.Abp;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.Attributes
|
|
||||||
{
|
|
||||||
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
|
|
||||||
public class CassandraTableAttribute : Attribute
|
|
||||||
{
|
|
||||||
public CassandraTableAttribute(string? name = null,string? keyspace =null)
|
|
||||||
{
|
|
||||||
Name = name;
|
|
||||||
Keyspace = keyspace;
|
|
||||||
}
|
|
||||||
|
|
||||||
public virtual string? Name { get; }
|
|
||||||
|
|
||||||
public virtual string? Keyspace { get; }
|
|
||||||
}
|
|
||||||
|
|
||||||
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]
|
|
||||||
public class CassandraIgnoreAttribute : Attribute
|
|
||||||
{
|
|
||||||
public CassandraIgnoreAttribute()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,18 +1,14 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.ComponentModel.DataAnnotations;
|
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using JiShe.CollectBus.Common.Attributes;
|
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IotSystems.MessageIssueds
|
namespace JiShe.CollectBus.IotSystems.MessageIssueds
|
||||||
{
|
{
|
||||||
[CassandraTable]
|
|
||||||
public class MessageIssued
|
public class MessageIssued
|
||||||
{
|
{
|
||||||
[Key]
|
|
||||||
public string ClientId { get; set; }
|
public string ClientId { get; set; }
|
||||||
public byte[] Message { get; set; }
|
public byte[] Message { get; set; }
|
||||||
public string DeviceNo { get; set; }
|
public string DeviceNo { get; set; }
|
||||||
|
|||||||
@ -43,7 +43,7 @@ namespace JiShe.CollectBus.Host
|
|||||||
ConfigureNetwork(context, configuration);
|
ConfigureNetwork(context, configuration);
|
||||||
ConfigureJwtAuthentication(context, configuration);
|
ConfigureJwtAuthentication(context, configuration);
|
||||||
ConfigureHangfire(context);
|
ConfigureHangfire(context);
|
||||||
//ConfigureCap(context, configuration);
|
ConfigureCap(context, configuration);
|
||||||
//ConfigureMassTransit(context, configuration);
|
//ConfigureMassTransit(context, configuration);
|
||||||
//ConfigureKafkaTopic(context, configuration);
|
//ConfigureKafkaTopic(context, configuration);
|
||||||
ConfigureAuditLog(context);
|
ConfigureAuditLog(context);
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
||||||
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
||||||
<title>后端服务</title>
|
<title>后端服务</title>
|
||||||
|
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
|
|||||||
@ -1,35 +1,12 @@
|
|||||||
using JiShe.CollectBus.Host;
|
using JiShe.CollectBus.Host;
|
||||||
using Microsoft.AspNetCore.Hosting;
|
using Microsoft.AspNetCore.Hosting;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
using Volo.Abp.Modularity.PlugIns;
|
|
||||||
|
|
||||||
public class Program
|
public class Program
|
||||||
{
|
{
|
||||||
/// <summary>
|
public static void Main(string[] args)
|
||||||
///
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="args"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static async Task Main(string[] args)
|
|
||||||
{
|
{
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
CreateHostBuilder(args).Build().Run();
|
||||||
builder.Host.UseContentRoot(Directory.GetCurrentDirectory())
|
|
||||||
.UseSerilog((context, loggerConfiguration) =>
|
|
||||||
{
|
|
||||||
loggerConfiguration.ReadFrom.Configuration(context.Configuration);
|
|
||||||
})
|
|
||||||
.UseAutofac();
|
|
||||||
await builder.AddApplicationAsync<CollectBusHostModule>(options =>
|
|
||||||
{
|
|
||||||
// 加载插件,固定模式,可热插拔
|
|
||||||
options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins"));
|
|
||||||
});
|
|
||||||
var app = builder.Build();
|
|
||||||
await app.InitializeApplicationAsync();
|
|
||||||
await app.RunAsync();
|
|
||||||
|
|
||||||
|
|
||||||
//await CreateHostBuilder(args).Build().RunAsync();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IHostBuilder CreateHostBuilder(string[] args) =>
|
private static IHostBuilder CreateHostBuilder(string[] args) =>
|
||||||
@ -39,12 +16,11 @@ public class Program
|
|||||||
{
|
{
|
||||||
loggerConfiguration.ReadFrom.Configuration(context.Configuration);
|
loggerConfiguration.ReadFrom.Configuration(context.Configuration);
|
||||||
})
|
})
|
||||||
|
.UseAutofac()
|
||||||
.ConfigureWebHostDefaults(webBuilder =>
|
.ConfigureWebHostDefaults(webBuilder =>
|
||||||
{
|
{
|
||||||
webBuilder.UseStartup<Startup>();
|
webBuilder.UseStartup<Startup>();
|
||||||
})
|
});
|
||||||
.UseAutofac();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -58,8 +34,8 @@ public class Program
|
|||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
private static async Task ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
|
private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
|
||||||
{
|
{
|
||||||
await services.AddApplicationAsync<CollectBusHostModule>();
|
services.AddApplication<CollectBusHostModule>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -39,15 +39,10 @@ namespace JiShe.CollectBus.Host
|
|||||||
/// <param name="lifetime">The lifetime.</param>
|
/// <param name="lifetime">The lifetime.</param>
|
||||||
public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime)
|
public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime)
|
||||||
{
|
{
|
||||||
|
app.InitializeApplication();
|
||||||
|
//await app.InitializeApplicationAsync();
|
||||||
|
|
||||||
app.Use(async (context, next) =>
|
|
||||||
{
|
|
||||||
// 在请求处理之前调用 InitializeApplicationAsync
|
|
||||||
await app.InitializeApplicationAsync();
|
|
||||||
|
|
||||||
// 继续请求管道中的下一个中间件
|
|
||||||
await next();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -130,48 +130,5 @@
|
|||||||
},
|
},
|
||||||
"ServerTagName": "JiSheCollectBus",
|
"ServerTagName": "JiSheCollectBus",
|
||||||
"KafkaReplicationFactor": 3,
|
"KafkaReplicationFactor": 3,
|
||||||
"NumPartitions": 30,
|
"NumPartitions": 30
|
||||||
"Cassandra": {
|
|
||||||
"ReplicationStrategy": {
|
|
||||||
"Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下
|
|
||||||
"DataCenters": [
|
|
||||||
{
|
|
||||||
"Name": "dc1",
|
|
||||||
"ReplicationFactor": 3
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"Nodes": [
|
|
||||||
{
|
|
||||||
"Host": "192.168.1.9",
|
|
||||||
"Port": 9042,
|
|
||||||
"DataCenter": "dc1",
|
|
||||||
"Rack": "RAC1"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"Host": "192.168.1.9",
|
|
||||||
"Port": 9043,
|
|
||||||
"DataCenter": "dc1",
|
|
||||||
"Rack": "RAC2"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"Username": "admin",
|
|
||||||
"Password": "lixiao1980",
|
|
||||||
"Keyspace": "jishecollectbus",
|
|
||||||
"ConsistencyLevel": "Quorum",
|
|
||||||
"PoolingOptions": {
|
|
||||||
"CoreConnectionsPerHost": 4,
|
|
||||||
"MaxConnectionsPerHost": 8,
|
|
||||||
"MaxRequestsPerConnection": 2000
|
|
||||||
},
|
|
||||||
"SocketOptions": {
|
|
||||||
"ConnectTimeoutMillis": 10000,
|
|
||||||
"ReadTimeoutMillis": 20000
|
|
||||||
},
|
|
||||||
"QueryOptions": {
|
|
||||||
"ConsistencyLevel": "Quorum",
|
|
||||||
"SerialConsistencyLevel": "Serial",
|
|
||||||
"DefaultIdempotence": true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@ -146,14 +146,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof((Ignore, TValue));
|
var consumerKey = typeof((Null, TValue));
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
||||||
(
|
(
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
CreateConsumer<Null, TValue>(groupId),
|
||||||
cts
|
cts
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
)).Consumer as IConsumer<Null, TValue>;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
|
|||||||
@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
|
|
||||||
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
|
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
|
||||||
|
|
||||||
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Ignore, TValue>>? deliveryHandler = null) where TValue : class;
|
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -114,8 +114,8 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
||||||
{
|
{
|
||||||
var producer = GetProducer<Ignore, TValue>();
|
var producer = GetProducer<Null, TValue>();
|
||||||
await producer.ProduceAsync(topic, new Message<Ignore, TValue> { Value = value });
|
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -160,13 +160,13 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
/// <param name="partition"></param>
|
/// <param name="partition"></param>
|
||||||
/// <param name="deliveryHandler"></param>
|
/// <param name="deliveryHandler"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Ignore, TValue>>? deliveryHandler = null) where TValue : class
|
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
|
||||||
{
|
{
|
||||||
var message = new Message<Ignore, TValue>
|
var message = new Message<Null, TValue>
|
||||||
{
|
{
|
||||||
Value = value
|
Value = value
|
||||||
};
|
};
|
||||||
var producer = GetProducer<Ignore, TValue>();
|
var producer = GetProducer<Null, TValue>();
|
||||||
if (partition.HasValue)
|
if (partition.HasValue)
|
||||||
{
|
{
|
||||||
var topicPartition = new TopicPartition(topic, partition.Value);
|
var topicPartition = new TopicPartition(topic, partition.Value);
|
||||||
|
|||||||
@ -12,14 +12,12 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.Protocols;
|
using JiShe.CollectBus.IotSystems.Protocols;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
||||||
{
|
{
|
||||||
public abstract class BaseProtocolPlugin : IProtocolPlugin
|
public abstract class BaseProtocolPlugin : IProtocolPlugin
|
||||||
{
|
{
|
||||||
private readonly IProducerService _producerService;
|
private readonly ICapPublisher _producerBus;
|
||||||
private readonly ILogger<BaseProtocolPlugin> _logger;
|
private readonly ILogger<BaseProtocolPlugin> _logger;
|
||||||
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
|
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
|
||||||
|
|
||||||
@ -39,7 +37,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
|
|
||||||
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
|
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
|
||||||
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
|
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
|
||||||
_producerService = serviceProvider.GetRequiredService<IProducerService>();
|
_producerBus = serviceProvider.GetRequiredService<ICapPublisher>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract ProtocolInfo Info { get; }
|
public abstract ProtocolInfo Info { get; }
|
||||||
@ -89,7 +87,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
};
|
};
|
||||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||||
|
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize());
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,7 +126,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
|
|||||||
Fn = 1
|
Fn = 1
|
||||||
};
|
};
|
||||||
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
|
||||||
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize());
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||||
|
|
||||||
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,7 +7,7 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
|
||||||
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
|
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
|
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
|
||||||
<PackageReference Include="TouchSocket" Version="2.1.9" />
|
<PackageReference Include="TouchSocket" Version="2.1.9" />
|
||||||
@ -17,7 +17,6 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -12,10 +12,10 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin));
|
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin));
|
||||||
}
|
}
|
||||||
|
|
||||||
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
|
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
||||||
{
|
{
|
||||||
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
|
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
|
||||||
await standardProtocol.AddAsync();
|
standardProtocol.AddAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user