Compare commits

...

8 Commits

Author SHA1 Message Date
11d3fcf162 移除CAP组件 2025-04-15 18:58:38 +08:00
a8f79c56f4 Merge branch 'dev' into zhy_feat_dev_v1 2025-04-15 18:06:32 +08:00
cli
27d3bad7fe 合并 2025-04-15 18:03:51 +08:00
cli
f9577156a3 修改代码 2025-04-15 18:02:39 +08:00
cli
3b29e58951 删除多余项目 2025-04-15 18:01:50 +08:00
cli
e9cd38bd64 合并 2025-04-15 18:01:30 +08:00
cli
aa55e476c2 封装Cassandra数据库操作 2025-04-15 17:57:47 +08:00
cli
83b7de52d5 测试迁入 2025-04-14 15:31:10 +08:00
41 changed files with 954 additions and 103 deletions

View File

@ -37,7 +37,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvi
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{82E4562A-3A7F-4372-8D42-8AE41BA56C04}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -109,10 +109,10 @@ Global
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.Build.0 = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -134,7 +134,7 @@ Global
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{82E4562A-3A7F-4372-8D42-8AE41BA56C04} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -26,6 +26,7 @@
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup>
</Project>

View File

@ -1,16 +1,17 @@
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka;
using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers
{
public interface ISubscriberAppService : IApplicationService
{
Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
Task ReceivedEvent(MessageReceived receivedMessage);
Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage);
Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage);
Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage);
}
}

View File

@ -1,6 +1,7 @@
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp.Application.Services;
@ -19,19 +20,19 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
#region
@ -39,7 +40,7 @@ namespace JiShe.CollectBus.Subscribers
/// 1分钟采集水表数据下行消息消费订阅
/// </summary>
/// <returns></returns>
Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage);
#endregion
}

View File

@ -12,8 +12,11 @@ using Microsoft.Extensions.DependencyInjection;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using JiShe.CollectBus.Cassandra;
using Volo.Abp;
using Volo.Abp.Application;
using Volo.Abp.Autofac;
using Volo.Abp.AutoMapper;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.BackgroundWorkers.Hangfire;
@ -27,11 +30,13 @@ namespace JiShe.CollectBus;
typeof(CollectBusApplicationContractsModule),
typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule),
typeof(AbpAutofacModule),
typeof(AbpBackgroundWorkersHangfireModule),
typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule),
typeof(CollectBusKafkaModule),
typeof(CollectBusIoTDBModule)
typeof(CollectBusIoTDBModule),
typeof(CollectBusCassandraModule)
)]
public class CollectBusApplicationModule : AbpModule
{
@ -46,20 +51,20 @@ public class CollectBusApplicationModule : AbpModule
});
}
public override void OnApplicationInitialization(
public override async Task OnApplicationInitializationAsync(
ApplicationInitializationContext context)
{
var assembly = Assembly.GetExecutingAssembly();
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
foreach (var type in types)
{
context.AddBackgroundWorkerAsync(type);
await context.AddBackgroundWorkerAsync(type);
}
//默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
//dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
await dbContext.InitAmmeterCacheData();
//await dbContext.InitWatermeterCacheData();
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
@ -70,7 +75,7 @@ public class CollectBusApplicationModule : AbpModule
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue<int>(CommonConst.NumPartitions), configuration.GetValue<short>(CommonConst.KafkaReplicationFactor));
await kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue<int>(CommonConst.NumPartitions), configuration.GetValue<short>(CommonConst.KafkaReplicationFactor));
}
}

View File

@ -23,9 +23,11 @@
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
<PackageReference Include="TouchSocket" Version="3.0.19" />
<PackageReference Include="TouchSocket.Hosting" Version="3.0.19" />
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />

View File

@ -7,10 +7,12 @@ using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using MassTransit;
using Microsoft.Extensions.Logging;
@ -26,7 +28,7 @@ namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
private readonly ICapPublisher _producerBus;
private readonly IProducerService _producerService;
private readonly ILogger<TcpMonitor> _logger;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
@ -34,16 +36,16 @@ namespace JiShe.CollectBus.Plugins
/// <summary>
///
/// </summary>
/// <param name="producerBus"></param>
/// <param name="producerService"></param>
/// <param name="logger"></param>
/// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param>
public TcpMonitor(ICapPublisher producerBus,
public TcpMonitor(IProducerService producerService,
ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache)
{
_producerBus = producerBus;
_producerService = producerService;
_logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache;
@ -170,7 +172,7 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize());
//await _producerBus.Publish( messageReceivedLoginEvent);
}
@ -217,7 +219,7 @@ namespace JiShe.CollectBus.Plugins
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize());
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
@ -245,7 +247,7 @@ namespace JiShe.CollectBus.Plugins
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
ClientId = client.Id,
ClientIp = client.IP,
@ -253,7 +255,7 @@ namespace JiShe.CollectBus.Plugins
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
});
}.Serialize());
}
}
}

View File

@ -215,19 +215,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
return aa == null;
}
[KafkaSubscribe(["test-topic1"])]
[KafkaSubscribe(["test-topic"])]
public async Task<ISubscribeAck> KafkaSubscribeAsync() // TestSubscribe obj
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{
var obj=string.Empty;
_logger.LogWarning($"收到订阅消息: {obj}");
return SubscribeAck.Success();
}
}
public class TestSubscribe
{
public string Topic { get; set; }
public int Val { get; set; }
}

View File

@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using Confluent.Kafka;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.PrepayModel;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using JiShe.CollectBus.IoTDBProvider.Context;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.AFNEntity;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Samples;
[AllowAnonymous]
public class TestAppService : CollectBusAppService, IApplicationService
{
private readonly ILogger<TestAppService> _logger;
private readonly ICassandraRepository<MessageIssued,string> _messageIssuedCassandraRepository;
public TestAppService(
ILogger<TestAppService> logger,
ICassandraRepository<MessageIssued, string> messageIssuedCassandraRepository
)
{
_logger = logger;
_messageIssuedCassandraRepository = messageIssuedCassandraRepository;
}
public async Task AddMessage()
{
await _messageIssuedCassandraRepository.InsertAsync(new MessageIssued
{
ClientId = Guid.NewGuid().ToString(),
Message = Array.Empty<byte>(),
DeviceNo = "123321312",
MessageId = Guid.NewGuid().ToString(),
Type = IssuedEventType.Data
});
}
}

View File

@ -29,7 +29,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly ICapPublisher _producerBus;
private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService;
@ -37,12 +36,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService,
IIoTDBProvider dbProvider)
{
_producerBus = producerBus;
_logger = logger;
_dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository;
@ -381,7 +378,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize());
//_= _producerBus.Publish(tempMsg);
@ -445,7 +442,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg);
@ -510,7 +507,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg);
@ -805,7 +802,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
await _producerService.ProduceAsync(topicName, partition, taskRecord);
await _producerService.ProduceAsync(topicName, partition, taskRecord.Serialize());
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
@ -846,7 +843,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(),
};
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize());
//_ = _producerBus.Publish(tempMsg);

View File

@ -36,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, meterReadingRecordRepository, producerService,dbProvider)
{
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
}

View File

@ -6,6 +6,8 @@ using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
@ -20,7 +22,7 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAppService> _logger;
private readonly ITcpService _tcpService;
@ -63,9 +65,10 @@ namespace JiShe.CollectBus.Subscribers
_dbProvider = dbProvider;
}
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task<ISubscribeAck> LoginIssuedEvent(IssuedEventMessage issuedEventMessage)
{
bool isAck = false;
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@ -76,6 +79,7 @@ namespace JiShe.CollectBus.Subscribers
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
isAck = true;
break;
case IssuedEventType.Data:
break;
@ -90,11 +94,13 @@ namespace JiShe.CollectBus.Subscribers
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
}
[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage)
{
bool isAck = false;
switch (issuedEventMessage.Type)
{
case IssuedEventType.Heartbeat:
@ -103,6 +109,7 @@ namespace JiShe.CollectBus.Subscribers
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
isAck = true;
break;
case IssuedEventType.Data:
break;
@ -117,10 +124,11 @@ namespace JiShe.CollectBus.Subscribers
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
}
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task ReceivedEvent(MessageReceived receivedMessage)
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task<ISubscribeAck> ReceivedEvent(MessageReceived receivedMessage)
{
var currentTime = Clock.Now;
@ -137,13 +145,13 @@ namespace JiShe.CollectBus.Subscribers
if(fN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return;
return SubscribeAck.Success();
}
var tb3761FN = fN.FnList.FirstOrDefault();
if (tb3761FN == null)
{
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return;
return SubscribeAck.Success();
}
//报文入库
@ -169,11 +177,14 @@ namespace JiShe.CollectBus.Subscribers
//todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
return SubscribeAck.Success();
}
[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
@ -185,10 +196,11 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
}
return SubscribeAck.Success();
}
[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task<ISubscribeAck> ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
@ -200,6 +212,7 @@ namespace JiShe.CollectBus.Subscribers
await protocolPlugin.LoginAsync(receivedLoginMessage);
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
}
return SubscribeAck.Success();
}
}
}

View File

@ -8,6 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Repository.MeterReadingRecord;
@ -24,7 +26,7 @@ namespace JiShe.CollectBus.Subscribers
/// 定时抄读任务消息消费订阅
/// </summary>
[Route($"/worker/app/subscriber")]
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
{
private readonly ILogger<WorkerSubscriberAppService> _logger;
private readonly ITcpService _tcpService;
@ -63,8 +65,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("ammeter/oneminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -81,6 +83,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
return SubscribeAck.Success();
}
/// <summary>
@ -90,8 +93,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("ammeter/fiveminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -108,6 +111,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
return SubscribeAck.Success();
}
/// <summary>
@ -117,8 +121,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("ammeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
try
@ -137,6 +141,7 @@ namespace JiShe.CollectBus.Subscribers
}
}
return SubscribeAck.Success();
}
catch (Exception ex)
{
@ -155,8 +160,8 @@ namespace JiShe.CollectBus.Subscribers
/// <returns></returns>
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task<ISubscribeAck> WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
_logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
@ -172,6 +177,7 @@ namespace JiShe.CollectBus.Subscribers
await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString));
}
}
return SubscribeAck.Success();
}
#endregion
}

View File

@ -27,7 +27,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(CreateToBeIssueTaskWorker);
CronExpression = $"*/{1} * * * *";
CronExpression = "* 10 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}

View File

@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
CronExpression = $"*/{15} * * * *";
CronExpression = "* 15 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}

View File

@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberFiveMinuteWorker);
CronExpression = $"*/{5} * * * *";
CronExpression = "* 5 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}

View File

@ -26,7 +26,7 @@ namespace JiShe.CollectBus.Workers
{
_logger = logger;
RecurringJobId = nameof(SubscriberOneMinuteWorker);
CronExpression = $"*/{1} * * * *";
CronExpression = "* 1 * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService;
}

View File

@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraConfig
{
public Node[] Nodes { get; set; }
public string Username { get; set; }
public string Password { get; set; }
public string Keyspace { get; set; }
public string ConsistencyLevel { get; set; }
public Pooling PoolingOptions { get; set; }
public Socket SocketOptions { get; set; }
public Query QueryOptions { get; set; }
public ReplicationStrategy ReplicationStrategy { get; set; }
}
public class Pooling
{
public int CoreConnectionsPerHost { get; set; }
public int MaxConnectionsPerHost { get; set; }
public int MaxRequestsPerConnection { get; set; }
}
public class Socket
{
public int ConnectTimeoutMillis { get; set; }
public int ReadTimeoutMillis { get; set; }
}
public class Query
{
public string ConsistencyLevel { get; set; }
public string SerialConsistencyLevel { get; set; }
public bool DefaultIdempotence { get; set; }
}
public class ReplicationStrategy
{
public string Class { get; set; }
public DataCenter[] DataCenters { get; set; }
}
public class DataCenter
{
public string Name { get; set; }
public int ReplicationFactor { get; set; }
public string Strategy { get; set; }
}
public class Node
{
public string Host { get; set; }
public int Port { get; set; }
public string DataCenter { get; set; }
public string Rack { get; set; }
}
}

View File

@ -0,0 +1,147 @@
using System;
using System.Linq;
using System.Reflection;
using System.Text;
using Cassandra;
using Cassandra.Mapping;
using Cassandra.Data.Linq;
using System.ComponentModel.DataAnnotations;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Common.Attributes;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraProvider : IDisposable, ICassandraProvider, ISingletonDependency
{
private readonly ILogger<CassandraProvider> _logger;
public Cluster Instance { get; set; }
public ISession Session { get; set; }
public CassandraConfig CassandraConfig { get; set; }
/// <summary>
///
/// </summary>
/// <param name="options"></param>
/// <param name="logger"></param>
public CassandraProvider(
IOptions<CassandraConfig> options,
ILogger<CassandraProvider> logger)
{
CassandraConfig = options.Value;
_logger = logger;
}
public void InitClusterAndSession()
{
GetCluster((keyspace) =>
{
GetSession(keyspace);
});
}
public Cluster GetCluster(Action<string?>? callback=null)
{
var clusterBuilder = Cluster.Builder();
// 添加多个节点
foreach (var node in CassandraConfig.Nodes)
{
clusterBuilder.AddContactPoint(node.Host)
.WithPort(node.Port);
}
clusterBuilder.WithCredentials(CassandraConfig.Username, CassandraConfig.Password);
// 优化连接池配置
var poolingOptions = new PoolingOptions()
.SetCoreConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.CoreConnectionsPerHost)
.SetMaxConnectionsPerHost(HostDistance.Local, CassandraConfig.PoolingOptions.MaxConnectionsPerHost)
.SetMaxRequestsPerConnection(CassandraConfig.PoolingOptions.MaxRequestsPerConnection)
.SetHeartBeatInterval(30000); // 30秒心跳
clusterBuilder.WithPoolingOptions(poolingOptions);
// 优化Socket配置
var socketOptions = new SocketOptions()
.SetConnectTimeoutMillis(CassandraConfig.SocketOptions.ConnectTimeoutMillis)
.SetReadTimeoutMillis(CassandraConfig.SocketOptions.ReadTimeoutMillis)
.SetTcpNoDelay(true) // 启用Nagle算法
.SetKeepAlive(true) // 启用TCP保活
.SetReceiveBufferSize(32768) // 32KB接收缓冲区
.SetSendBufferSize(32768); // 32KB发送缓冲区
clusterBuilder.WithSocketOptions(socketOptions);
// 优化查询选项
var queryOptions = new QueryOptions()
.SetConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.ConsistencyLevel))
.SetSerialConsistencyLevel((ConsistencyLevel)Enum.Parse(typeof(ConsistencyLevel), CassandraConfig.QueryOptions.SerialConsistencyLevel))
.SetDefaultIdempotence(CassandraConfig.QueryOptions.DefaultIdempotence)
.SetPageSize(5000); // 增加页面大小
clusterBuilder.WithQueryOptions(queryOptions);
// 启用压缩
clusterBuilder.WithCompression(CompressionType.LZ4);
// 配置重连策略
clusterBuilder.WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000));
Instance = clusterBuilder.Build();
callback?.Invoke(null);
return Instance;
}
public ISession GetSession(string? keyspace = null)
{
if (string.IsNullOrEmpty(keyspace))
{
keyspace = CassandraConfig.Keyspace;
}
Session = Instance.Connect();
var replication = GetReplicationStrategy();
Session.CreateKeyspaceIfNotExists(keyspace, replication);
Session.ChangeKeyspace(keyspace);
return Session;
}
private Dictionary<string, string> GetReplicationStrategy()
{
var strategy = CassandraConfig.ReplicationStrategy.Class;
var dataCenters = CassandraConfig.ReplicationStrategy.DataCenters;
switch (strategy)
{
case "NetworkTopologyStrategy":
var networkDic = new Dictionary<string, string> { { "class", "NetworkTopologyStrategy" } };
foreach (var dataCenter in dataCenters)
{
networkDic.Add(dataCenter.Name, dataCenter.ReplicationFactor.ToString());
}
return networkDic;
case "SimpleStrategy":
var dic = new Dictionary<string, string> { { "class", "SimpleStrategy" } };
if (dataCenters.Length >= 1)
{
dic.Add("replication_factor", dataCenters[0].ReplicationFactor.ToString());
}
else
{
_logger.LogError("SimpleStrategy 不支持多个数据中心!");
}
return dic;
default:
throw new ArgumentNullException($"Strategy", "Strategy配置错误");
}
}
public void Dispose()
{
Instance.Dispose();
Session.Dispose();
}
}
}

View File

@ -0,0 +1,156 @@
using System.Collections.Concurrent;
using Cassandra;
using Cassandra.Mapping;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraQueryOptimizer
{
private readonly ISession _session;
private readonly ILogger<CassandraQueryOptimizer> _logger;
private readonly IMemoryCache _cache;
private readonly ConcurrentDictionary<string, PreparedStatement> _preparedStatements;
private readonly int _batchSize;
private readonly TimeSpan _cacheExpiration;
public CassandraQueryOptimizer(
ISession session,
ILogger<CassandraQueryOptimizer> logger,
IMemoryCache cache,
int batchSize = 100,
TimeSpan? cacheExpiration = null)
{
_session = session;
_logger = logger;
_cache = cache;
_preparedStatements = new ConcurrentDictionary<string, PreparedStatement>();
_batchSize = batchSize;
_cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5);
}
public async Task<PreparedStatement> GetOrPrepareStatementAsync(string cql)
{
return _preparedStatements.GetOrAdd(cql, key =>
{
try
{
var statement = _session.Prepare(key);
_logger.LogDebug($"Prepared statement for CQL: {key}");
return statement;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to prepare statement for CQL: {key}");
throw;
}
});
}
public async Task ExecuteBatchAsync(IEnumerable<BoundStatement> statements)
{
var batch = new BatchStatement();
var count = 0;
foreach (var statement in statements)
{
batch.Add(statement);
count++;
if (count >= _batchSize)
{
await ExecuteBatchAsync(batch);
batch = new BatchStatement();
count = 0;
}
}
if (count > 0)
{
await ExecuteBatchAsync(batch);
}
}
private async Task ExecuteBatchAsync(BatchStatement batch)
{
try
{
await _session.ExecuteAsync(batch);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to execute batch statement");
throw;
}
}
public async Task<T> GetOrSetFromCacheAsync<T>(string cacheKey, Func<Task<T>> getData)
{
if (_cache.TryGetValue(cacheKey, out T cachedValue))
{
_logger.LogDebug($"Cache hit for key: {cacheKey}");
return cachedValue;
}
var data = await getData();
_cache.Set(cacheKey, data, _cacheExpiration);
_logger.LogDebug($"Cache miss for key: {cacheKey}, data cached");
return data;
}
public async Task<IEnumerable<T>> ExecutePagedQueryAsync<T>(
string cql,
object[] parameters,
int pageSize = 100,
string pagingState = null) where T : class
{
var statement = await GetOrPrepareStatementAsync(cql);
var boundStatement = statement.Bind(parameters);
if (!string.IsNullOrEmpty(pagingState))
{
boundStatement.SetPagingState(Convert.FromBase64String(pagingState));
}
boundStatement.SetPageSize(pageSize);
try
{
var result = await _session.ExecuteAsync(boundStatement);
//TODO: RETURN OBJECT
throw new NotImplementedException();
//result.GetRows()
//return result.Select(row => row);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to execute paged query: {cql}");
throw;
}
}
public async Task BulkInsertAsync<T>(IEnumerable<T> items, string tableName)
{
var mapper = new Mapper(_session);
var batch = new List<BoundStatement>();
var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})";
foreach (var chunk in items.Chunk(_batchSize))
{
var statements = chunk.Select(item =>
{
var props = typeof(T).GetProperties();
var columns = string.Join(", ", props.Select(p => p.Name));
var values = string.Join(", ", props.Select(p => "?"));
var statement = _session.Prepare(string.Format(cql, columns, values));
return statement.Bind(props.Select(p => p.GetValue(item)).ToArray());
});
batch.AddRange(statements);
}
await ExecuteBatchAsync(batch);
}
}
}

View File

@ -0,0 +1,69 @@
using Cassandra;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Extensions;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraRepository<TEntity, TKey>
: ICassandraRepository<TEntity, TKey>
where TEntity : class
{
public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig)
{
Mapper = new Mapper(cassandraProvider.Session, mappingConfig);
cassandraProvider.Session.CreateTable<TEntity>(cassandraProvider.CassandraConfig.Keyspace);
}
public readonly IMapper Mapper;
public virtual async Task<TEntity> GetAsync(TKey id)
{
return await Mapper.SingleOrDefaultAsync<TEntity>("WHERE id = ?", id);
}
public virtual async Task<List<TEntity>> GetListAsync()
{
return (await Mapper.FetchAsync<TEntity>()).ToList();
}
public virtual async Task<TEntity> InsertAsync(TEntity entity)
{
await Mapper.InsertAsync(entity);
return entity;
}
public virtual async Task<TEntity> UpdateAsync(TEntity entity)
{
await Mapper.UpdateAsync(entity);
return entity;
}
public virtual async Task DeleteAsync(TEntity entity)
{
await Mapper.DeleteAsync(entity);
}
public virtual async Task DeleteAsync(TKey id)
{
await Mapper.DeleteAsync<TEntity>("WHERE id = ?", id);
}
public virtual async Task<List<TEntity>> GetPagedListAsync(
int skipCount,
int maxResultCount,
string sorting)
{
var cql = $"SELECT * FROM {typeof(TEntity).Name.ToLower()}";
if (!string.IsNullOrWhiteSpace(sorting))
{
cql += $" ORDER BY {sorting}";
}
cql += $" LIMIT {maxResultCount} OFFSET {skipCount}";
return (await Mapper.FetchAsync<TEntity>(cql)).ToList();
}
}
}

View File

@ -0,0 +1,29 @@
using Cassandra;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Mappers;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp;
using Volo.Abp.Autofac;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Cassandra
{
[DependsOn(
typeof(AbpAutofacModule)
)]
public class CollectBusCassandraModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
Configure<CassandraConfig>(context.Services.GetConfiguration().GetSection("Cassandra"));
context.AddCassandra();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context.UseCassandra();
}
}
}

View File

@ -0,0 +1,35 @@
using Autofac.Core;
using Cassandra;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.Cassandra.Mappers;
using Microsoft.Extensions.Options;
using System.Reflection;
using Volo.Abp;
using Volo.Abp.Modularity;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class ApplicationInitializationContextExtensions
{
public static void UseCassandra(this ApplicationInitializationContext context)
{
var service = context.ServiceProvider;
var cassandraProvider = service.GetRequiredService<ICassandraProvider>();
cassandraProvider.InitClusterAndSession();
}
}
public static class ServiceCollectionExtensions
{
public static void AddCassandra(this ServiceConfigurationContext context)
{
context.Services.AddSingleton(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
var mappingConfig = new MappingConfiguration()
.Define(new CollectBusMapping());
context.Services.AddSingleton(mappingConfig);
}
}
}

View File

@ -0,0 +1,83 @@
using System.Reflection;
using System.Text;
using Cassandra;
using System.ComponentModel.DataAnnotations;
using JiShe.CollectBus.Common.Attributes;
using Cassandra.Mapping;
namespace JiShe.CollectBus.Cassandra.Extensions
{
public static class SessionExtension
{
public static void CreateTable<TEntity>(this ISession session,string? defaultKeyspace=null) where TEntity : class
{
var type = typeof(TEntity);
var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>();
var tableName = tableAttribute?.Name ?? type.Name.ToLower();
var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
var properties = type.GetProperties();
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<KeyAttribute>() != null);
if (primaryKey == null)
{
throw new InvalidOperationException($"No primary key defined for type {type.Name}");
}
var cql = new StringBuilder();
cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} (");
foreach (var prop in properties)
{
var ignoreAttribute = prop.GetCustomAttribute<CassandraIgnoreAttribute>();
if (ignoreAttribute != null) continue;
var columnName = prop.Name.ToLower();
var cqlType = GetCassandraType(prop.PropertyType);
cql.Append($"{columnName} {cqlType}, ");
}
cql.Length -= 2; // Remove last comma and space
cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))");
session.Execute(cql.ToString());
}
private static string GetCassandraType(Type type)
{
// 基础类型处理
switch (Type.GetTypeCode(type))
{
case TypeCode.String: return "text";
case TypeCode.Int32: return "int";
case TypeCode.Int64: return "bigint";
case TypeCode.Boolean: return "boolean";
case TypeCode.DateTime: return "timestamp";
case TypeCode.Byte: return "tinyint";
}
if (type == typeof(Guid)) return "uuid";
if (type == typeof(DateTimeOffset)) return "timestamp";
if (type == typeof(Byte[])) return "blob";
// 处理集合类型
if (type.IsGenericType)
{
var genericType = type.GetGenericTypeDefinition();
var elementType = type.GetGenericArguments()[0];
if (genericType == typeof(List<>))
return $"list<{GetCassandraType(elementType)}>";
if (genericType == typeof(HashSet<>))
return $"set<{GetCassandraType(elementType)}>";
if (genericType == typeof(Dictionary<,>))
{
var keyType = type.GetGenericArguments()[0];
var valueType = type.GetGenericArguments()[1];
return $"map<{GetCassandraType(keyType)}, {GetCassandraType(valueType)}>";
}
}
throw new NotSupportedException($"不支持的类型: {type.Name}");
}
}
}

View File

@ -0,0 +1,24 @@
using Cassandra;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Cassandra
{
public interface ICassandraProvider
{
Cluster Instance { get;}
ISession Session { get;}
CassandraConfig CassandraConfig { get; }
ISession GetSession(string? keyspace = null);
Cluster GetCluster(Action<string?>? callback = null);
void InitClusterAndSession();
}
}

View File

@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.Cassandra
{
public interface ICassandraRepository<TEntity, TKey> where TEntity : class
{
Task<TEntity> GetAsync(TKey id);
Task<List<TEntity>> GetListAsync();
Task<TEntity> InsertAsync(TEntity entity);
Task<TEntity> UpdateAsync(TEntity entity);
Task DeleteAsync(TEntity entity);
Task DeleteAsync(TKey id);
Task<List<TEntity>> GetPagedListAsync(int skipCount, int maxResultCount, string sorting);
}
}

View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CassandraCSharpDriver" Version="3.22.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,20 @@
using Cassandra.Mapping;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using static Cassandra.QueryTrace;
namespace JiShe.CollectBus.Cassandra.Mappers
{
public class CollectBusMapping: Mappings
{
public CollectBusMapping()
{
For<MessageIssued>()
.Column(e => e.Type, cm => cm.WithName("type").WithDbType<int>());
}
}
}

View File

@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.EventBus;
using Volo.Abp;
namespace JiShe.CollectBus.Common.Attributes
{
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class CassandraTableAttribute : Attribute
{
public CassandraTableAttribute(string? name = null,string? keyspace =null)
{
Name = name;
Keyspace = keyspace;
}
public virtual string? Name { get; }
public virtual string? Keyspace { get; }
}
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]
public class CassandraIgnoreAttribute : Attribute
{
public CassandraIgnoreAttribute()
{
}
}
}

View File

@ -1,14 +1,18 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.Common.Enums;
namespace JiShe.CollectBus.IotSystems.MessageIssueds
{
[CassandraTable]
public class MessageIssued
{
[Key]
public string ClientId { get; set; }
public byte[] Message { get; set; }
public string DeviceNo { get; set; }

View File

@ -43,7 +43,7 @@ namespace JiShe.CollectBus.Host
ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration);
ConfigureHangfire(context);
ConfigureCap(context, configuration);
//ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration);
//ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context);

View File

@ -16,7 +16,6 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
<title>后端服务</title>
</head>
<body>

View File

@ -1,12 +1,35 @@
using JiShe.CollectBus.Host;
using Microsoft.AspNetCore.Hosting;
using Serilog;
using Volo.Abp.Modularity.PlugIns;
public class Program
{
public static void Main(string[] args)
/// <summary>
///
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public static async Task Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
var builder = WebApplication.CreateBuilder(args);
builder.Host.UseContentRoot(Directory.GetCurrentDirectory())
.UseSerilog((context, loggerConfiguration) =>
{
loggerConfiguration.ReadFrom.Configuration(context.Configuration);
})
.UseAutofac();
await builder.AddApplicationAsync<CollectBusHostModule>(options =>
{
// 加载插件,固定模式,可热插拔
options.PlugInSources.AddFolder(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins"));
});
var app = builder.Build();
await app.InitializeApplicationAsync();
await app.RunAsync();
//await CreateHostBuilder(args).Build().RunAsync();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
@ -16,13 +39,14 @@ public class Program
{
loggerConfiguration.ReadFrom.Configuration(context.Configuration);
})
.UseAutofac()
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
})
.UseAutofac();
private static IHostBuilder CreateConsoleHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
@ -34,8 +58,8 @@ public class Program
});
private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
private static async Task ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
{
services.AddApplication<CollectBusHostModule>();
await services.AddApplicationAsync<CollectBusHostModule>();
}
}

View File

@ -39,10 +39,15 @@ namespace JiShe.CollectBus.Host
/// <param name="lifetime">The lifetime.</param>
public void Configure(IApplicationBuilder app, IHostApplicationLifetime lifetime)
{
app.InitializeApplication();
//await app.InitializeApplicationAsync();
app.Use(async (context, next) =>
{
// 在请求处理之前调用 InitializeApplicationAsync
await app.InitializeApplicationAsync();
// 继续请求管道中的下一个中间件
await next();
});
}
}
}

View File

@ -130,5 +130,48 @@
},
"ServerTagName": "JiSheCollectBus",
"KafkaReplicationFactor": 3,
"NumPartitions": 30
"NumPartitions": 30,
"Cassandra": {
"ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy
"DataCenters": [
{
"Name": "dc1",
"ReplicationFactor": 3
}
]
},
"Nodes": [
{
"Host": "192.168.1.9",
"Port": 9042,
"DataCenter": "dc1",
"Rack": "RAC1"
},
{
"Host": "192.168.1.9",
"Port": 9043,
"DataCenter": "dc1",
"Rack": "RAC2"
}
],
"Username": "admin",
"Password": "lixiao1980",
"Keyspace": "jishecollectbus",
"ConsistencyLevel": "Quorum",
"PoolingOptions": {
"CoreConnectionsPerHost": 4,
"MaxConnectionsPerHost": 8,
"MaxRequestsPerConnection": 2000
},
"SocketOptions": {
"ConnectTimeoutMillis": 10000,
"ReadTimeoutMillis": 20000
},
"QueryOptions": {
"ConsistencyLevel": "Quorum",
"SerialConsistencyLevel": "Serial",
"DefaultIdempotence": true
}
}
}

View File

@ -146,14 +146,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
var consumerKey = typeof((Null, TValue));
var consumerKey = typeof((Ignore, TValue));
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
(
CreateConsumer<Null, TValue>(groupId),
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Null, TValue>;
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);

View File

@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Ignore, TValue>>? deliveryHandler = null) where TValue : class;
}
}

View File

@ -114,8 +114,8 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{
var producer = GetProducer<Null, TValue>();
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value });
var producer = GetProducer<Ignore, TValue>();
await producer.ProduceAsync(topic, new Message<Ignore, TValue> { Value = value });
}
/// <summary>
@ -160,13 +160,13 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Ignore, TValue>>? deliveryHandler = null) where TValue : class
{
var message = new Message<Null, TValue>
var message = new Message<Ignore, TValue>
{
Value = value
};
var producer = GetProducer<Null, TValue>();
var producer = GetProducer<Ignore, TValue>();
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);

View File

@ -12,12 +12,14 @@ using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Common.Helpers;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
public abstract class BaseProtocolPlugin : IProtocolPlugin
{
private readonly ICapPublisher _producerBus;
private readonly IProducerService _producerService;
private readonly ILogger<BaseProtocolPlugin> _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
@ -37,7 +39,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
_logger = serviceProvider.GetRequiredService<ILogger<BaseProtocolPlugin>>();
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_producerBus = serviceProvider.GetRequiredService<ICapPublisher>();
_producerService = serviceProvider.GetRequiredService<IProducerService>();
}
public abstract ProtocolInfo Info { get; }
@ -87,7 +89,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize());
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
}
@ -126,7 +128,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize());
//await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
}

View File

@ -7,7 +7,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="DotNetCore.CAP" Version="8.3.1" />
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="MassTransit.Abstractions" Version="8.3.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="TouchSocket" Version="2.1.9" />
@ -17,6 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup>
</Project>

View File

@ -12,10 +12,10 @@ namespace JiShe.CollectBus.Protocol
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin));
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
standardProtocol.AddAsync();
await standardProtocol.AddAsync();
}
}
}