Compare commits

..

No commits in common. "ea91622217c57ff0d9fb332fe40ef1e7e0b96625" and "04da31f4234a8bd4609cb9f1945578defcf92b70" have entirely different histories.

37 changed files with 789 additions and 520 deletions

View File

@ -9,8 +9,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -11,7 +11,7 @@ using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System.Text; using System.Text;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter namespace GatherService.WattMeter.AnalysisData.AFN_10H
{ {
/// <summary> /// <summary>
/// 5.16.1.2.1 F1透明转发 读取SIM卡信息 /// 5.16.1.2.1 F1透明转发 读取SIM卡信息
@ -64,7 +64,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
DataType = IOTDBDataTypeConst.Data, DataType = IOTDBDataTypeConst.Data,
}; };
result?.Invoke(dto); result?.Invoke(dto);
await _dataStorage.SaveDataToIotDbAsync(dto); await _dataStorage.SaveDataToIotDbAsync<string?>(dto);
return await Task.FromResult(true); return await Task.FromResult(true);
} }
catch (Exception ex) catch (Exception ex)

View File

@ -1,12 +1,11 @@
using JiShe.CollectBus.Common.Consts; using GatherService.WattMeter.AnalysisData.AFN_10H;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto; using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto; using JiShe.CollectBus.Protocol.Dto;
using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter;
using JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Watermeter;
using JiShe.CollectBus.Protocol3761; using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
@ -55,11 +54,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H
{ {
result?.Invoke(dto); result?.Invoke(dto);
}); });
else if (value.Contains(F10TranspondMatch.ES190_DC))
await _analysisStrategyContext.ExecuteAsync<TB3761>(nameof(ES190DC_Analysis), input, dto =>
{
result?.Invoke(dto);
});
else else
{ {
//TODO 写入1条日志 //TODO 写入1条日志

View File

@ -1,4 +1,5 @@
using DeviceDetectorNET.Parser.Device; using DeviceDetectorNET.Parser.Device;
using GatherService.WattMeter.AnalysisData.AFN_10H;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
@ -18,7 +19,7 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter; using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H
{ {
/// <summary> /// <summary>
/// 透抄 电网频率 /// 透抄 电网频率
@ -73,7 +74,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
}; };
result?.Invoke(unitDataAnalysis); result?.Invoke(unitDataAnalysis);
await _dataStorage.SaveDataToIotDbAsync(unitDataAnalysis); await _dataStorage.SaveDataToIotDbAsync<decimal?>(unitDataAnalysis);
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@ -133,7 +134,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
string dataMark = string.Join("",dataField.GetRange(0, 4).ReduceHex33(true)); string dataMark = string.Join("",dataField.GetRange(0, 4).ReduceHex33(true));
values.Add(dataMark);//数据标识 values.Add(dataMark);//数据标识
var readValue = dataField.GetRange(4, len - 4).ReduceHex33(true);//值 var readValue = dataField.GetRange(4, len - 4).ReduceHex33(true);//值
await _analysisStrategyContext.ExecuteAsync($"Appendix_{dataMark}", readValue, (value) => await _analysisStrategyContext.ExecuteAsync<List<string>>($"Appendix_{dataMark}", readValue, (value) =>
{ {
values.Add(value.ToString()); values.Add(value.ToString());
}); });

View File

@ -11,7 +11,7 @@ using JiShe.CollectBus.Protocol.T37612012.AnalysisData;
using JiShe.CollectBus.Protocol3761; using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter namespace GatherService.WattMeter.AnalysisData.AFN_10H
{ {
/// <summary> /// <summary>
/// 透明转发---跳合闸 /// 透明转发---跳合闸
@ -42,7 +42,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
var data = new AnalysisBaseDto<bool?>() var data = new AnalysisBaseDto<bool?>()
{ {
FiledDesc = "跳合闸", FiledDesc = "跳合闸",
DataValue = datas[2].Equals("9C") || datas[2].Equals("94") ? true : false, DataValue = (datas[2].Equals("9C") || datas[2].Equals("94")) ? true : false,
ItemType= "10_98", ItemType= "10_98",
}; };

View File

@ -1,130 +0,0 @@
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
using JiShe.CollectBus.Protocol.Dto;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter;
using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Watermeter
{
/// <summary>
/// ES_190DC 4G水表读数解析
/// </summary>
public class ES190DC_Analysis : IAnalysisStrategy<TB3761>
{
private readonly ILogger<ES190DC_Analysis> _logger;
private readonly DataStorage _dataStorage;
public ES190DC_Analysis(ILogger<ES190DC_Analysis> logger, DataStorage dataStorage)
{
_logger = logger;
_dataStorage = dataStorage;
}
public async Task<bool> ExecuteAsync(TB3761 input, Action<dynamic>? result = null)
{
try
{
ArgumentNullException.ThrowIfNull(input);
ArgumentNullException.ThrowIfNull(input.A.Code);
ArgumentNullException.ThrowIfNull(input.UnitData?.HexMessageList);
List<AnalysisBaseDto<decimal?>> list = GenerateFinalResult(input.UnitData.HexMessageList);
if (list.Count > 0)
{
// 查询设备信息
DeviceInfo? deviceInfo = await _dataStorage.GetDeviceInfoAsync(input.A.Code, input.DA.Pn, list[0].DeviceAddress);
if (deviceInfo != null)
{
list.ForEach(item =>
{
item.ProjectId = deviceInfo.ProjectID;
item.DeviceId = deviceInfo.MeterId;
item.DatabaseBusiID = deviceInfo.DatabaseBusiID;
item.DeviceAddress = deviceInfo.MeterAddress;
item.FocusId = deviceInfo.FocusId;
});
}
}
UnitDataAnalysis<List<AnalysisBaseDto<decimal?>>> unitDataAnalysis = new UnitDataAnalysis<List<AnalysisBaseDto<decimal?>>>
{
Code = input.A.Code!,
AFN = input.AFN_FC.AFN,
Fn = input.DT.Fn,
Pn = input.DA.Pn,
MSA = input.A.A3!.D1_D7!,
PSEQ = input.SEQ.PSEQ,
Data = list,
ReceivedHexMessage = input.BaseHexMessage.HexMessageString,
MessageId = input.MessageId,
TimeDensity = 1,//密度-间隔,
DensityUnit = DensityUnit.Hour,
ReceivedTime = input.ReceivedTime,
DataType = IOTDBDataTypeConst.Data
};
result?.Invoke(unitDataAnalysis);
await _dataStorage.SaveMultipleDataToIotDbAsync(unitDataAnalysis);
return await Task.FromResult(true);
}
catch (Exception ex)
{
_logger.LogError(ex, $"ES190DC 4G水表解析失败:{input.A.Code}-{input.DT.Fn}-{input.BaseHexMessage.HexMessageString},{ex.Message}");
return await Task.FromResult(false);
}
}
private List<AnalysisBaseDto<decimal?>> GenerateFinalResult(List<string> hexMessageList)
{
List<AnalysisBaseDto<decimal?>> list = new List<AnalysisBaseDto<decimal?>>();
string address = string.Join("", hexMessageList.Skip(12).Take(7).Reverse());
if (hexMessageList[19].Contains("81"))
{
int count = Convert.ToInt32(hexMessageList[36].HexToDec());
DateTime startTime = Convert.ToDateTime($"{DateTime.Now.Year.ToString().Substring(0, 2)}{hexMessageList[41]}-{hexMessageList[40]}-{hexMessageList[39]} {hexMessageList[38]}:{hexMessageList[37]}:00");
List<string> valueArr = hexMessageList.GetRange(42, count * 4);
int nextIndex = 0;
for (int i = 1; i <= count; i++)
{
AnalysisBaseDto<decimal?> meter = new AnalysisBaseDto<decimal?>();
meter.DeviceType = MeterTypeEnum.WaterMeter;
meter.DeviceAddress = address;
var arr = valueArr.GetRange(nextIndex, 4);
var errorCode = arr[4].CheckErrorCode();
if (errorCode != null)
{
meter.ValidData = false;
meter.ErrorCodeMsg = errorCode.Item2;
}
else
{
string val = $"{arr[3]}{arr[2]}{arr[1]}.{arr[0]}";
if (decimal.TryParse(val, out decimal value))
meter.DataValue = value;
}
string timeSpan = startTime.AddHours(i - 1).ToString("yyyy-MM-dd HH:mm:ss");
if (DateTime.TryParse(timeSpan, out DateTime readingDate))
{
meter.TimeSpan = readingDate;
}
meter.ItemType = "10_250";
meter.FiledDesc = "当前累积流量";
meter.FiledName = meter.ItemType.GetDataFieldByGatherDataType() ?? string.Empty;
nextIndex += 4;
}
}
return list;
}
}
}

View File

@ -27,6 +27,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers; public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers;
@ -40,6 +41,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
_logger = logger; _logger = logger;
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>(); //_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
_producerService = serviceProvider.GetRequiredService<IProducerService>(); _producerService = serviceProvider.GetRequiredService<IProducerService>();
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
_tcpService = tcpService; _tcpService = tcpService;
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers; T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
} }
@ -131,6 +133,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
else else
{ {
_logger.LogError($"不支持的上报kafka主题{topicName}"); _logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
} }
} }

View File

@ -20,11 +20,13 @@ namespace JiShe.CollectBus.Protocol.Abstracts
public const string errorData = "EE"; public const string errorData = "EE";
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
private readonly IFreeRedisProvider _redisProvider; private readonly IFreeRedisProvider _redisProvider;
public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger) public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger)
{ {
_logger = logger; _logger = logger;
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
_redisProvider = serviceProvider.GetRequiredService<IFreeRedisProvider>(); _redisProvider = serviceProvider.GetRequiredService<IFreeRedisProvider>();
} }
@ -40,6 +42,8 @@ namespace JiShe.CollectBus.Protocol.Abstracts
throw new ArgumentNullException(nameof(Info)); throw new ArgumentNullException(nameof(Info));
} }
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
await _protocolInfoRepository.InsertAsync(Info);
await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name); await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info); await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
} }

View File

@ -12,6 +12,7 @@ namespace JiShe.CollectBus.Subscribers
{ {
Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessage); Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessage); Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
Task<ISubscribeAck> ReceivedEvent(MessageProtocolAnalysis<TB3761> receivedMessage);
Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessage); Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessage);
Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessage); Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessage);
} }

View File

@ -1,10 +1,13 @@
using JiShe.CollectBus.DataChannels; using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IoTDB; using JiShe.CollectBus.IoTDB;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Mappers;
using JiShe.CollectBus.Protocol; using JiShe.CollectBus.Protocol;
using JiShe.CollectBus.ScheduledMeterReading; using JiShe.CollectBus.ScheduledMeterReading;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -16,8 +19,12 @@ using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Application; using Volo.Abp.Application;
using Volo.Abp.AuditLogging;
using Volo.Abp.Autofac; using Volo.Abp.Autofac;
using Volo.Abp.AutoMapper; using Volo.Abp.AutoMapper;
using Volo.Abp.BackgroundJobs;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -27,12 +34,16 @@ namespace JiShe.CollectBus;
typeof(CollectBusApplicationContractsModule), typeof(CollectBusApplicationContractsModule),
typeof(AbpDddApplicationModule), typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule), typeof(AbpAutoMapperModule),
typeof(AbpAutofacModule),
typeof(AbpBackgroundWorkersHangfireModule),
typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule), typeof(CollectBusFreeSqlModule),
typeof(CollectBusKafkaModule), typeof(CollectBusKafkaModule),
typeof(CollectBusIoTDbModule), typeof(CollectBusIoTDbModule),
typeof(AbpAutofacModule),
typeof(CollectBusDomainSharedModule), typeof(CollectBusDomainSharedModule),
typeof(AbpAuditLoggingDomainModule),
typeof(AbpBackgroundJobsDomainModule),
typeof(CollectBusCassandraModule),
typeof(CollectBusProtocolModule) typeof(CollectBusProtocolModule)
)] )]
public class CollectBusApplicationModule : AbpModule public class CollectBusApplicationModule : AbpModule
@ -44,8 +55,8 @@ public class CollectBusApplicationModule : AbpModule
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>(); context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
Configure<AbpAutoMapperOptions>(options => { options.AddMaps<CollectBusApplicationModule>(true); }); Configure<AbpAutoMapperOptions>(options => { options.AddMaps<CollectBusApplicationModule>(true); });
//context.Services.AddSingleton(new MappingConfiguration() context.Services.AddSingleton(new MappingConfiguration()
// .Define(new CollectBusMapping())); .Define(new CollectBusMapping()));
// 注册拦截器 // 注册拦截器
context.Services.OnRegistered(ctx => context.Services.OnRegistered(ctx =>
@ -59,17 +70,33 @@ public class CollectBusApplicationModule : AbpModule
public override async Task OnApplicationInitializationAsync( public override async Task OnApplicationInitializationAsync(
ApplicationInitializationContext context) ApplicationInitializationContext context)
{ {
//var assembly = Assembly.GetExecutingAssembly(); var assembly = Assembly.GetExecutingAssembly();
//var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface) var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface)
// .ToList(); .ToList();
//foreach (var type in types) foreach (var type in types) await context.AddBackgroundWorkerAsync(type);
//Task.Run(() =>
//{ //{
// await context.AddBackgroundWorkerAsync(type); // //默认初始化表计信息
//} // var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
// dbContext.InitAmmeterCacheData();
// //await dbContext.InitWatermeterCacheData();
//}).ConfigureAwait(false);
//下发任务通道构建 //下发任务通道构建
DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>(); DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
// 日志存储通道构建
DataChannelManage.LogSaveChannel = Channel.CreateUnbounded<object>();
// 日志刷新通道构建
DataChannelManage.LogRefreshChannel = Channel.CreateUnbounded<object>();
// 启动通道任务
var _dataChannelManage = context.ServiceProvider.GetRequiredService<DataChannelManageService>();
_ = _dataChannelManage.LogSaveAsync(DataChannelManage.LogSaveChannel.Reader);
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
await dbContext.InitAmmeterCacheData("V4-Gather-8890"); await dbContext.InitAmmeterCacheData("V4-Gather-8890");

View File

@ -1,4 +1,6 @@
using JiShe.CollectBus.Common; using Amazon.Runtime.Internal.Transform;
using DnsClient.Protocol;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
@ -8,6 +10,7 @@ using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Dto; using JiShe.CollectBus.Protocol.Dto;
using JiShe.CollectBus.Protocol.Models; using JiShe.CollectBus.Protocol.Models;
using JiShe.CollectBus.Repository.LogRecord;
using Mapster; using Mapster;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -33,19 +36,22 @@ namespace JiShe.CollectBus.DataChannels
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
private readonly KafkaOptionConfig _kafkaOptions; private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions; private readonly ServerApplicationOptions _applicationOptions;
private readonly ILogRecordRepository _logRecordRepository;
public DataChannelManageService( public DataChannelManageService(
ILogger<DataChannelManageService> logger, ILogger<DataChannelManageService> logger,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IProducerService producerService, IProducerService producerService,
IOptions<KafkaOptionConfig> kafkaOptions, IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions) IOptions<ServerApplicationOptions> applicationOptions,
ILogRecordRepository logRecordRepository)
{ {
_logger = logger; _logger = logger;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_producerService = producerService; _producerService = producerService;
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value; _applicationOptions = applicationOptions.Value;
_logRecordRepository= logRecordRepository;
} }
/// <summary> /// <summary>
@ -201,126 +207,126 @@ namespace JiShe.CollectBus.DataChannels
///// <summary> /// <summary>
///// 日志保存 /// 日志保存
///// </summary> /// </summary>
///// <param name="channelReader"></param> /// <param name="channelReader"></param>
///// <returns></returns> /// <returns></returns>
//public async Task LogSaveAsync(ChannelReader<object> channelReader) public async Task LogSaveAsync(ChannelReader<object> channelReader)
//{ {
// const int BatchSize = 1000; const int BatchSize = 1000;
// const int EmptyWaitMilliseconds = 1000; const int EmptyWaitMilliseconds = 1000;
// var timeout = TimeSpan.FromSeconds(2); var timeout = TimeSpan.FromSeconds(2);
// var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
// long timeoutMilliseconds = 0; long timeoutMilliseconds = 0;
// try try
// { {
// while (true) while (true)
// { {
// var batch = new List<object>(); var batch = new List<object>();
// var canRead = channelReader.Count; var canRead = channelReader.Count;
// if (canRead <= 0) if (canRead <= 0)
// { {
// if (timeoutMilliseconds > 0) if (timeoutMilliseconds > 0)
// { {
// _logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); _logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
// } }
// timeoutMilliseconds = 0; timeoutMilliseconds = 0;
// //无消息时短等待1秒 //无消息时短等待1秒
// await Task.Delay(EmptyWaitMilliseconds); await Task.Delay(EmptyWaitMilliseconds);
// continue; continue;
// } }
// timer.Restart(); timer.Restart();
// var startTime = DateTime.Now; var startTime = DateTime.Now;
// try try
// { {
// // 异步批量读取数据 // 异步批量读取数据
// while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout)
// { {
// try try
// { {
// if (channelReader.TryRead(out var dataItem)) if (channelReader.TryRead(out var dataItem))
// { {
// batch.Add(dataItem); batch.Add(dataItem);
// } }
// } }
// catch (Exception) catch (Exception)
// { {
// throw; throw;
// } }
// } }
// } }
// catch (Exception) catch (Exception)
// { {
// throw; throw;
// } }
// if (batch == null || batch.Count == 0) if (batch == null || batch.Count == 0)
// { {
// await Task.Delay(EmptyWaitMilliseconds); await Task.Delay(EmptyWaitMilliseconds);
// continue; continue;
// } }
// try try
// { {
// // 按小时分组 // 按小时分组
// var hourGroups = new Dictionary<DateTime, List<LogRecords>>(); var hourGroups = new Dictionary<DateTime, List<LogRecords>>();
// DateTime? dateTime = null; DateTime? dateTime = null;
// List<LogRecords> batchList = new List<LogRecords>(); List<LogRecords> batchList = new List<LogRecords>();
// int index = 1; int index = 1;
// foreach (var item in batch) foreach (var item in batch)
// { {
// var records = item.Adapt<LogRecords>(); var records = item.Adapt<LogRecords>();
// if (!records.ReceivedTime.HasValue) if (!records.ReceivedTime.HasValue)
// records.ReceivedTime = DateTime.Now; records.ReceivedTime = DateTime.Now;
// var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0); var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0);
// if (!dateTime.HasValue || curDateTime != dateTime) if (!dateTime.HasValue || curDateTime != dateTime)
// { {
// dateTime = curDateTime; dateTime = curDateTime;
// if (batchList.Count > 0) if (batchList.Count > 0)
// { {
// var immutableList = ImmutableList.CreateRange(batchList); var immutableList = ImmutableList.CreateRange(batchList);
// hourGroups.Add(dateTime.Value, immutableList.ToList()); hourGroups.Add(dateTime.Value, immutableList.ToList());
// batchList.Clear(); batchList.Clear();
// } }
// } }
// batchList.Add(records); batchList.Add(records);
// // 最后一批 // 最后一批
// if(index== batch.Count) if(index== batch.Count)
// { {
// var immutableList = ImmutableList.CreateRange(batchList); var immutableList = ImmutableList.CreateRange(batchList);
// hourGroups.Add(dateTime.Value, immutableList.ToList()); hourGroups.Add(dateTime.Value, immutableList.ToList());
// batchList.Clear(); batchList.Clear();
// } }
// index++; index++;
// } }
// foreach (var (time, records) in hourGroups) foreach (var (time, records) in hourGroups)
// { {
// // 批量写入数据库 // 批量写入数据库
// await _logRecordRepository.InsertManyAsync(records, time); await _logRecordRepository.InsertManyAsync(records, time);
// } }
// } }
// catch (Exception ex) catch (Exception ex)
// { {
// _logger.LogError(ex, "数据通道处理日志数据时发生异常"); _logger.LogError(ex, "数据通道处理日志数据时发生异常");
// } }
// batch.Clear(); batch.Clear();
// timer.Stop(); timer.Stop();
// timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
// startTime = DateTime.Now; startTime = DateTime.Now;
// } }
// } }
// catch (Exception ex) catch (Exception ex)
// { {
// _logger.LogCritical(ex, "日志处理发生致命错误"); _logger.LogCritical(ex, "日志处理发生致命错误");
// throw; throw;
// } }
//} }
} }
} }

View File

@ -17,22 +17,20 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AutoMapper" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AutoMapper" Version="8.3.3" />
<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
<PackageReference Include="TouchSocket" Version="3.1.2" /> <PackageReference Include="TouchSocket" Version="3.1.2" />
<PackageReference Include="TouchSocket.Hosting" Version="3.1.2" /> <PackageReference Include="TouchSocket.Hosting" Version="3.1.2" />
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" /> <PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
<ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj" />
<ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" /> <ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup> <ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ItemGroup>
<Folder Include="Mappers\" />
<Folder Include="Workers\" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -0,0 +1,17 @@
using Cassandra.Mapping;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
namespace JiShe.CollectBus.Mappers
{
public class CollectBusMapping: Mappings
{
public CollectBusMapping()
{
For<MessageIssued>()
.Column(e => e.Type, cm => cm.WithName("type").WithDbType<int>());
For<Device>()
.Column(e => e.Status, cm => cm.WithName("status").WithDbType<int>());
}
}
}

View File

@ -1,4 +1,6 @@
using JiShe.CollectBus.Common.Enums; using Cassandra;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.DynamicModule; using JiShe.CollectBus.DynamicModule;
using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
@ -20,8 +22,8 @@ namespace JiShe.CollectBus.Samples;
public class TestAppService : CollectBusAppService public class TestAppService : CollectBusAppService
{ {
private readonly ILogger<TestAppService> _logger; private readonly ILogger<TestAppService> _logger;
//private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository; private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
//private readonly ICassandraProvider _cassandraProvider; private readonly ICassandraProvider _cassandraProvider;
private readonly IProtocolService _protocolService; private readonly IProtocolService _protocolService;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IDynamicModuleManager _dynamicModuleManager; private readonly IDynamicModuleManager _dynamicModuleManager;
@ -29,94 +31,93 @@ public class TestAppService : CollectBusAppService
public TestAppService( public TestAppService(
ILogger<TestAppService> logger, ILogger<TestAppService> logger,
//ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository, ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository,
//ICassandraProvider cassandraProvider, ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager)
IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager)
{ {
_logger = logger; _logger = logger;
//_messageReceivedCassandraRepository = messageReceivedCassandraRepository; _messageReceivedCassandraRepository = messageReceivedCassandraRepository;
//_cassandraProvider = cassandraProvider; _cassandraProvider = cassandraProvider;
_protocolService = protocolService; _protocolService = protocolService;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_dynamicModuleManager = dynamicModuleManager; _dynamicModuleManager = dynamicModuleManager;
} }
public async Task AddMessageOfCassandra() public async Task AddMessageOfCassandra()
{ {
//var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
//for (int i = 1; i <= 10000; i++) for (int i = 1; i <= 10000; i++)
//{ {
// var str = Guid.NewGuid().ToString(); var str = Guid.NewGuid().ToString();
// await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued
// { {
// ClientId = str, ClientId = str,
// DeviceNo = i.ToString(), DeviceNo = i.ToString(),
// MessageId = str, MessageId = str,
// Type = IssuedEventType.Data, Type = IssuedEventType.Data,
// Id = str, Id = str,
// Message = str.GetBytes() Message = str.GetBytes()
// }); });
//} }
//stopwatch.Stop(); stopwatch.Stop();
//_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); _logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
} }
public async Task AddMessageOfBulkInsertCassandra() public async Task AddMessageOfBulkInsertCassandra()
{ {
//var records = new List<MessageIssued>(); var records = new List<MessageIssued>();
//var prepared = await _cassandraProvider.Session.PrepareAsync( var prepared = await _cassandraProvider.Session.PrepareAsync(
// $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)"); $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)");
//for (int i = 1; i <= 100000; i++) for (int i = 1; i <= 100000; i++)
//{ {
// var str = Guid.NewGuid().ToString(); var str = Guid.NewGuid().ToString();
// records.Add(new MessageIssued records.Add(new MessageIssued
// { {
// ClientId = str, ClientId = str,
// DeviceNo = i.ToString(), DeviceNo = i.ToString(),
// MessageId = str, MessageId = str,
// Type = IssuedEventType.Data, Type = IssuedEventType.Data,
// Id = str, Id = str,
// Message = str.GetBytes() Message = str.GetBytes()
// }); });
//} }
//var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
//await BulkInsertAsync(_cassandraProvider.Session, prepared, records); await BulkInsertAsync(_cassandraProvider.Session, prepared, records);
//stopwatch.Stop(); stopwatch.Stop();
//_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒"); _logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
} }
//private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List<MessageIssued> records) private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List<MessageIssued> records)
//{ {
// var tasks = new List<Task>(); var tasks = new List<Task>();
// var batch = new BatchStatement(); var batch = new BatchStatement();
// for (int i = 0; i < records.Count; i++) for (int i = 0; i < records.Count; i++)
// { {
// var record = records[i]; var record = records[i];
// var boundStatement = prepared.Bind( var boundStatement = prepared.Bind(
// record.Id, record.Id,
// record.ClientId, record.ClientId,
// record.Message, record.Message,
// record.DeviceNo, record.DeviceNo,
// (int)record.Type, (int)record.Type,
// record.MessageId); record.MessageId);
// // 设置一致性级别为ONE以提高性能 // 设置一致性级别为ONE以提高性能
// boundStatement.SetConsistencyLevel(ConsistencyLevel.One); boundStatement.SetConsistencyLevel(ConsistencyLevel.One);
// batch.Add(boundStatement); batch.Add(boundStatement);
// // 当达到批处理大小时执行 // 当达到批处理大小时执行
// if (batch.Statements.Count() >= 1000 || i == records.Count - 1) if (batch.Statements.Count() >= 1000 || i == records.Count - 1)
// { {
// tasks.Add(session.ExecuteAsync(batch)); tasks.Add(session.ExecuteAsync(batch));
// batch = new BatchStatement(); batch = new BatchStatement();
// } }
// } }
// // 等待所有批处理完成 // 等待所有批处理完成
// await Task.WhenAll(tasks); await Task.WhenAll(tasks);
//} }
[LogIntercept] [LogIntercept]
public virtual Task<string> LogInterceptorTest(string str) public virtual Task<string> LogInterceptorTest(string str)

View File

@ -334,6 +334,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// 创建取消令牌源 // 创建取消令牌源
//var cts = new CancellationTokenSource(); //var cts = new CancellationTokenSource();
await _dbProvider.GetSessionPool(true).InitTableSessionModelAsync();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader); _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
// //此处代码不要删除 // //此处代码不要删除

View File

@ -7,6 +7,7 @@ using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761; using JiShe.CollectBus.Protocol3761;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
@ -23,6 +24,7 @@ namespace JiShe.CollectBus.Subscribers
private readonly ILogger<SubscriberAnalysisAppService> _logger; private readonly ILogger<SubscriberAnalysisAppService> _logger;
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDbProvider _dbProvider; private readonly IIoTDbProvider _dbProvider;
private readonly IProtocolService _protocolService; private readonly IProtocolService _protocolService;
@ -30,11 +32,12 @@ namespace JiShe.CollectBus.Subscribers
ITcpService tcpService, ITcpService tcpService,
IServiceProvider serviceProvider, IServiceProvider serviceProvider,
IIoTDbProvider dbProvider, IIoTDbProvider dbProvider,
IProtocolService protocolService) IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService)
{ {
_logger = logger; _logger = logger;
_tcpService = tcpService; _tcpService = tcpService;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider; _dbProvider = dbProvider;
_protocolService = protocolService; _protocolService = protocolService;
} }

View File

@ -0,0 +1,172 @@
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Subscribers
{
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository;
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDbProvider _dbProvider;
private readonly IProtocolService _protocolService;
/// <summary>
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="tcpService">The TCP service.</param>
/// <param name="serviceProvider">The service provider.</param>
/// <param name="messageReceivedLoginEventRepository">The message received login event repository.</param>
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
/// <param name="meterReadingRecordsRepository">The device repository.</param>
public SubscriberAppService(ILogger<SubscriberAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IIoTDbProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_messageReceivedLoginEventRepository = messageReceivedLoginEventRepository;
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider;
_protocolService = protocolService;
}
[LogIntercept]
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{
bool isAck = true;
foreach (var issuedEventMessage in issuedEventMessages)
{
//var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
//if (loginEntity == null)
//{
// isAck=false;
// break;
//}
//loginEntity.AckTime = Clock.Now;
//loginEntity.IsAck = true;
//await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
}
// TODO:暂时ACK等后续处理是否放到私信队列中
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{
bool isAck = true;
//foreach (var issuedEventMessage in issuedEventMessages)
//{
// var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
// if (heartbeatEntity == null)
// {
// isAck = false;
// break;
// }
// heartbeatEntity.AckTime = Clock.Now;
// heartbeatEntity.IsAck = true;
// await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
// }
// TODO:暂时ACK等后续处理是否放到私信队列中
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
}
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task<ISubscribeAck> ReceivedEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
{
var currentTime = Clock.Now;
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
//todo 会根据不同的协议进行解析,然后做业务处理
//TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
//if (tB3761 == null)
//{
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
// return SubscribeAck.Success();
//}
//if (tB3761.DT == null || tB3761.AFN_FC == null)
//{
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
// return SubscribeAck.Success();
//}
//报文入库
var entity = new MeterReadingRecords()
{
ReceivedMessageHexString = receivedMessage.MessageHexString,
AFN = (AFN)receivedMessage.Data?.AFN_FC.AFN!,
Fn = receivedMessage.Data.DT.Fn,
Pn = 0,
FocusAddress = "",
MeterAddress = "",
};
//如果没数据,则插入,有数据则更新
var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime);
if (updateEntity == null)
{
await _meterReadingRecordsRepository.InsertAsync(entity, currentTime);
}
//_dbProvider.InsertAsync();
//todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
}
return SubscribeAck.Success();
}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
{
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
return SubscribeAck.Success();
}
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)]
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
{
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
return SubscribeAck.Success();
}
}
}

View File

@ -0,0 +1,43 @@
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Hangfire;
//using JiShe.CollectBus.Common.Consts;
//using JiShe.CollectBus.ScheduledMeterReading;
//using Microsoft.Extensions.Logging;
//using Volo.Abp.BackgroundWorkers.Hangfire;
//using Volo.Abp.DependencyInjection;
//using Volo.Abp.Uow;
//namespace JiShe.CollectBus.Workers
//{
// /// <summary>
// /// 构建待处理的下发指令任务处理
// /// </summary>
// public class CreateToBeIssueTaskWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker
// {
// private readonly ILogger<CreateToBeIssueTaskWorker> _logger;
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
// /// <summary>
// /// Initializes a new instance of the <see cref="CreateToBeIssueTaskWorker"/> class.
// /// </summary>
// /// <param name="logger">The logger.</param>
// /// <param name="scheduledMeterReadingService">定时任务</param>
// public CreateToBeIssueTaskWorker(ILogger<CreateToBeIssueTaskWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
// {
// _logger = logger;
// RecurringJobId = nameof(CreateToBeIssueTaskWorker);
// CronExpression = "0 0/1 * * * *";
// TimeZone = TimeZoneInfo.Local;
// this._scheduledMeterReadingService = scheduledMeterReadingService;
// }
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// _logger.LogError($"{DateTime.Now}");
// // await _scheduledMeterReadingService.CreateToBeIssueTasks();
// }
// }
//}

View File

@ -0,0 +1,39 @@
//using JiShe.CollectBus.ScheduledMeterReading;
//using Microsoft.Extensions.Logging;
//using System;
//using System.Collections.Generic;
//using System.Linq;
//using System.Text;
//using System.Threading;
//using System.Threading.Tasks;
//using Volo.Abp.BackgroundWorkers.Hangfire;
//using Volo.Abp.DependencyInjection;
//namespace JiShe.CollectBus.Workers
//{
// /// <summary>
// /// 定时数据检测1小时一次
// /// </summary>
// public class DataDetectionFifteenMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker
// {
// private readonly ILogger<CreateToBeIssueTaskWorker> _logger;
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
// public DataDetectionFifteenMinuteWorker(ILogger<CreateToBeIssueTaskWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
// {
// _logger = logger;
// RecurringJobId = nameof(CreateToBeIssueTaskWorker);
// CronExpression = "0 0 0/1 * * ?";
// TimeZone = TimeZoneInfo.Local;
// this._scheduledMeterReadingService = scheduledMeterReadingService;
// }
// public override Task DoWorkAsync(CancellationToken cancellationToken = default)
// {
// //throw new NotImplementedException();
// return Task.CompletedTask;
// }
// }
//}

View File

@ -0,0 +1,39 @@
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Hangfire;
//using JiShe.CollectBus.Common.Attributes;
//using Microsoft.Extensions.Logging;
//using Volo.Abp.BackgroundWorkers.Hangfire;
//using Volo.Abp.DependencyInjection;
//using Volo.Abp.Uow;
//namespace JiShe.CollectBus.Workers
//{
// public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
// {
// private readonly ILogger<EpiCollectWorker> _logger;
// /// <summary>
// /// Initializes a new instance of the <see cref="EpiCollectWorker"/> class.
// /// </summary>
// /// <param name="logger">The logger.</param>
// public EpiCollectWorker(ILogger<EpiCollectWorker> logger)
// {
// _logger = logger;
// RecurringJobId = nameof(EpiCollectWorker);
// CronExpression = Cron.Daily();
// }
// public override Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// using (var uow = LazyServiceProvider.LazyGetRequiredService<IUnitOfWorkManager>().Begin())
// {
// Logger.LogInformation("Executed MyLogWorker..!");
// return Task.CompletedTask;
// }
// }
// }
//}

View File

@ -0,0 +1,48 @@
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Hangfire;
//using JiShe.CollectBus.ScheduledMeterReading;
//using Microsoft.Extensions.Logging;
//using Volo.Abp.BackgroundWorkers.Hangfire;
//using Volo.Abp.DependencyInjection;
//using Volo.Abp.Uow;
//namespace JiShe.CollectBus.Workers
//{
// /// <summary>
// /// 15分钟采集数据
// /// </summary>
// public class SubscriberFifteenMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker
// {
// private readonly ILogger<SubscriberFifteenMinuteWorker> _logger;
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
// /// <summary>
// /// Initializes a new instance of the <see cref="SubscriberFifteenMinuteWorker"/> class.
// /// </summary>
// /// <param name="logger">The logger.</param>
// /// <param name="scheduledMeterReadingService">定时任务</param>
// public SubscriberFifteenMinuteWorker(ILogger<SubscriberFifteenMinuteWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
// {
// _logger = logger;
// RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
// CronExpression = "0 0/15 * * * *";
// TimeZone = TimeZoneInfo.Local;
// this._scheduledMeterReadingService = scheduledMeterReadingService;
// }
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// //await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading();
// //await _scheduledMeterReadingService.WatermeterScheduledMeterFifteenMinuteReading();
// //using (var uow = LazyServiceProvider.LazyGetRequiredService<IUnitOfWorkManager>().Begin())
// //{
// // Logger.LogInformation("Executed MyLogWorker..!");
// // return Task.CompletedTask;
// //}
// }
// }
//}

View File

@ -0,0 +1,42 @@
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Hangfire;
//using JiShe.CollectBus.ScheduledMeterReading;
//using Microsoft.Extensions.Logging;
//using Volo.Abp.BackgroundWorkers.Hangfire;
//using Volo.Abp.DependencyInjection;
//using Volo.Abp.Uow;
//namespace JiShe.CollectBus.Workers
//{
// /// <summary>
// /// 5分钟采集数据
// /// </summary>
// public class SubscriberFiveMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
// {
// private readonly ILogger<SubscriberFiveMinuteWorker> _logger;
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
// /// <summary>
// /// Initializes a new instance of the <see cref="SubscriberFiveMinuteWorker"/> class.
// /// </summary>
// /// <param name="logger">The logger.</param>
// /// <param name="scheduledMeterReadingService">定时任务</param>
// public SubscriberFiveMinuteWorker(ILogger<SubscriberFiveMinuteWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
// {
// _logger = logger;
// RecurringJobId = nameof(SubscriberFiveMinuteWorker);
// CronExpression = "0 0/5 * * * *";
// TimeZone = TimeZoneInfo.Local;
// this._scheduledMeterReadingService = scheduledMeterReadingService;
// }
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// //await _scheduledMeterReadingService.AmmeterScheduledMeterFiveMinuteReading();
// //await _scheduledMeterReadingService.WatermeterScheduledMeterFiveMinuteReading();
// }
// }
//}

View File

@ -0,0 +1,44 @@
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Hangfire;
//using JiShe.CollectBus.ScheduledMeterReading;
//using Microsoft.Extensions.Logging;
//using Volo.Abp.BackgroundWorkers.Hangfire;
//using Volo.Abp.DependencyInjection;
//using Volo.Abp.Uow;
//namespace JiShe.CollectBus.Workers
//{
// /// <summary>
// /// 1分钟采集数据
// /// </summary>
// public class SubscriberOneMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
// {
// private readonly ILogger<SubscriberOneMinuteWorker> _logger;
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
// /// <summary>
// /// Initializes a new instance of the <see cref="SubscriberOneMinuteWorker"/> class.
// /// </summary>
// /// <param name="logger">The logger.</param>
// /// <param name="scheduledMeterReadingService">定时任务</param>
// public SubscriberOneMinuteWorker(ILogger<SubscriberOneMinuteWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
// {
// _logger = logger;
// RecurringJobId = nameof(SubscriberOneMinuteWorker);
// CronExpression = "0 0/1 * * * *";
// TimeZone = TimeZoneInfo.Local;
// this._scheduledMeterReadingService = scheduledMeterReadingService;
// }
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// //await _scheduledMeterReadingService.AmmeterScheduledMeterOneMinuteReading();
// //await _scheduledMeterReadingService.WatermeterScheduledMeterOneMinuteReading();
// }
// }
//}

View File

@ -1,4 +1,4 @@
using JiShe.CollectBus.IoTDB; using JiShe.CollectBus.MongoDB;
using Volo.Abp.Autofac; using Volo.Abp.Autofac;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
@ -6,7 +6,7 @@ namespace JiShe.CollectBus.DbMigrator;
[DependsOn( [DependsOn(
typeof(AbpAutofacModule), typeof(AbpAutofacModule),
typeof(CollectBusIoTDbModule), typeof(CollectBusMongoDbModule),
typeof(CollectBusApplicationContractsModule) typeof(CollectBusApplicationContractsModule)
)] )]
public class CollectBusDbMigratorModule : AbpModule public class CollectBusDbMigratorModule : AbpModule

View File

@ -7,7 +7,6 @@ using JiShe.CollectBus.Data;
using Serilog; using Serilog;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Data; using Volo.Abp.Data;
using JiShe.CollectBus.IoTDB.Interface;
namespace JiShe.CollectBus.DbMigrator; namespace JiShe.CollectBus.DbMigrator;
@ -34,16 +33,10 @@ public class DbMigratorHostedService : IHostedService
{ {
await application.InitializeAsync(); await application.InitializeAsync();
//await application
// .ServiceProvider
// .GetRequiredService<CollectBusDbMigrationService>()
// .MigrateAsync();
//初始化IoTDB表模型
await application await application
.ServiceProvider .ServiceProvider
.GetRequiredService<IIoTDbProvider>().GetSessionPool(true) .GetRequiredService<CollectBusDbMigrationService>()
.InitTableSessionModelAsync(); .MigrateAsync();
await application.ShutdownAsync(); await application.ShutdownAsync();

View File

@ -18,6 +18,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -1,14 +1,5 @@
{ {
"ConnectionStrings": { "ConnectionStrings": {
"Default": "mongodb://admin:collectbus_mongodb_jishe@118.190.144.92:37017/JiSheCollectBus?authSource=admin" "Default": "mongodb://admin:collectbus_mongodb_jishe@118.190.144.92:37017/JiSheCollectBus?authSource=admin"
},
"IoTDBOptions": {
"UserName": "root",
"Password": "root",
"ClusterList": [ "192.168.5.9:6667" ],
"PoolSize": 32,
"DataBaseName": "energy",
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
} }
} }

View File

@ -18,16 +18,12 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0" /> <PackageReference Include="JetBrains.Annotations" Version="2024.2.0" />
<PackageReference Include="Mapster" Version="7.4.0" /> <PackageReference Include="Mapster" Version="7.4.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" /> <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Serilog" Version="4.1.0" /> <PackageReference Include="Serilog" Version="4.1.0" />
<PackageReference Include="System.Linq.Dynamic.Core" Version="1.6.4" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
</ItemGroup> </ItemGroup>

View File

@ -1,82 +0,0 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.WorkService
{
/// <summary>
/// 系统后台定时服务
/// </summary>
public abstract class SystemBackGroundWorkService : BackgroundService
{
/// <summary>
/// 日志记录
/// </summary>
public ILogger<SystemBackGroundWorkService> Logger { get; set; }
/// <summary>
/// 创建一个取消标记源
/// </summary>
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
/// <summary>
/// 任务执行间隔时间
/// </summary>
private TimeSpan interval;
protected SystemBackGroundWorkService(ILogger<SystemBackGroundWorkService> logger)
{
Logger = logger;
interval = GetInterval();
}
/// <summary>
/// 执行时间间隔
/// </summary>
/// <returns></returns>
protected abstract TimeSpan GetInterval();
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.CompletedTask;// 等待其他任务执行完成,避免阻塞应用程序启动
Logger.LogInformation($"任务每隔{interval.TotalSeconds}秒执行一次");
await InitAsync(cancellationTokenSource.Token);
}
protected virtual async Task InitAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await DoWorkAsync(cancellationToken);
}
catch (Exception ex)
{
Logger.LogError(ex, "后台任务执行发生异常");
}
await Task.Delay(interval, cancellationToken);
}
}
public override Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("后台服务停止……");
cancellationTokenSource.Cancel();
return base.StopAsync(cancellationToken);
}
/// <summary>
/// 抛出方法入口以便于其服务实现
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected abstract Task DoWorkAsync(CancellationToken cancellationToken);
}
}

View File

@ -19,6 +19,7 @@ using TouchSocket.Sockets;
using JiShe.CollectBus.Plugins; using JiShe.CollectBus.Plugins;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks;
using JiShe.CollectBus.Cassandra;
namespace JiShe.CollectBus.Host namespace JiShe.CollectBus.Host
@ -38,18 +39,18 @@ namespace JiShe.CollectBus.Host
Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = false; }); Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = false; });
//context.Services.AddHangfire(config => context.Services.AddHangfire(config =>
//{ {
// config.UseRedisStorage( config.UseRedisStorage(
// context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions) context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
// .WithJobExpirationTimeout(TimeSpan.FromDays(7)); .WithJobExpirationTimeout(TimeSpan.FromDays(7));
// var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔 var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
// const int Attempts = 3; // 重试次数 const int Attempts = 3; // 重试次数
// config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds }); config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
// //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7))); //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
// config.UseFilter(new JobRetryLastFilter(Attempts)); config.UseFilter(new JobRetryLastFilter(Attempts));
//}); });
//context.Services.AddHangfireServer(); context.Services.AddHangfireServer();
} }
/// <summary> /// <summary>
@ -193,9 +194,9 @@ namespace JiShe.CollectBus.Host
options => options =>
{ {
options.IgnoredUrls.Add("/AuditLogs/page"); options.IgnoredUrls.Add("/AuditLogs/page");
//options.IgnoredUrls.Add("/hangfire/stats"); options.IgnoredUrls.Add("/hangfire/stats");
options.IgnoredUrls.Add("/hangfire/recurring/trigger"); options.IgnoredUrls.Add("/hangfire/recurring/trigger");
//options.IgnoredUrls.Add("/cap"); options.IgnoredUrls.Add("/cap");
options.IgnoredUrls.Add("/"); options.IgnoredUrls.Add("/");
}); });
} }
@ -252,8 +253,10 @@ namespace JiShe.CollectBus.Host
private void ConfigureHealthChecks(ServiceConfigurationContext context, IConfiguration configuration) private void ConfigureHealthChecks(ServiceConfigurationContext context, IConfiguration configuration)
{ {
if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return; if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return;
var cassandraConfig = new CassandraConfig();
configuration.GetSection("Cassandra").Bind(cassandraConfig);
context.Services.AddHealthChecks() context.Services.AddHealthChecks()
//.AddMongoDb(configuration.GetConnectionString("Default"), "MongoDB", HealthStatus.Unhealthy) .AddMongoDb(configuration.GetConnectionString("Default"), "MongoDB", HealthStatus.Unhealthy)
.AddRedis(configuration.GetValue<string>("Redis:Configuration") ?? string.Empty, "Redis", .AddRedis(configuration.GetValue<string>("Redis:Configuration") ?? string.Empty, "Redis",
HealthStatus.Unhealthy) HealthStatus.Unhealthy)
//.AddKafka(new Confluent.Kafka.ProducerConfig //.AddKafka(new Confluent.Kafka.ProducerConfig

View File

@ -5,12 +5,14 @@ using JiShe.CollectBus.Host.Extensions;
using JiShe.CollectBus.Host.HealthChecks; using JiShe.CollectBus.Host.HealthChecks;
using JiShe.CollectBus.Host.Swaggers; using JiShe.CollectBus.Host.Swaggers;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.MongoDB;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Swashbuckle.AspNetCore.SwaggerUI; using Swashbuckle.AspNetCore.SwaggerUI;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.AspNetCore.Authentication.JwtBearer; using Volo.Abp.AspNetCore.Authentication.JwtBearer;
using Volo.Abp.AspNetCore.Serilog; using Volo.Abp.AspNetCore.Serilog;
using Volo.Abp.Autofac; using Volo.Abp.Autofac;
using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.Caching.StackExchangeRedis; using Volo.Abp.Caching.StackExchangeRedis;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Volo.Abp.Swashbuckle; using Volo.Abp.Swashbuckle;
@ -28,7 +30,9 @@ namespace JiShe.CollectBus.Host
typeof(AbpSwashbuckleModule), typeof(AbpSwashbuckleModule),
typeof(AbpTimingModule), typeof(AbpTimingModule),
typeof(CollectBusApplicationModule), typeof(CollectBusApplicationModule),
typeof(AbpCachingStackExchangeRedisModule) typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule),
typeof(AbpBackgroundWorkersHangfireModule)
)] )]
public partial class CollectBusHostModule : AbpModule public partial class CollectBusHostModule : AbpModule
{ {
@ -43,8 +47,8 @@ namespace JiShe.CollectBus.Host
ConfigureSwaggerServices(context, configuration); ConfigureSwaggerServices(context, configuration);
ConfigureNetwork(context, configuration); ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration); ConfigureJwtAuthentication(context, configuration);
//ConfigureHangfire(context); ConfigureHangfire(context);
//ConfigureAuditLog(context); ConfigureAuditLog(context);
ConfigureCustom(context, configuration); ConfigureCustom(context, configuration);
ConfigureHealthChecks(context, configuration); ConfigureHealthChecks(context, configuration);
Configure<AbpClockOptions>(options => { options.Kind = DateTimeKind.Local; }); Configure<AbpClockOptions>(options => { options.Kind = DateTimeKind.Local; });
@ -88,10 +92,10 @@ namespace JiShe.CollectBus.Host
app.UseAuditing(); app.UseAuditing();
app.UseAbpSerilogEnrichers(); app.UseAbpSerilogEnrichers();
app.UseUnitOfWork(); app.UseUnitOfWork();
//app.UseHangfireDashboard("/hangfire", new DashboardOptions app.UseHangfireDashboard("/hangfire", new DashboardOptions
//{ {
// IgnoreAntiforgeryToken = true IgnoreAntiforgeryToken = true
//}); });
app.UseConfiguredEndpoints(endpoints => app.UseConfiguredEndpoints(endpoints =>
{ {
if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return; if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return;

View File

@ -1,5 +1,5 @@
//using Cassandra; using Cassandra;
//using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Cassandra;
using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace JiShe.CollectBus.Host.HealthChecks namespace JiShe.CollectBus.Host.HealthChecks
@ -31,28 +31,27 @@ namespace JiShe.CollectBus.Host.HealthChecks
/// </returns> /// </returns>
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{ {
return HealthCheckResult.Healthy("Cassandra is unhealthy."); var cassandraConfig = new CassandraConfig();
//var cassandraConfig = new CassandraConfig(); _configuration.GetSection("Cassandra").Bind(cassandraConfig);
//_configuration.GetSection("Cassandra").Bind(cassandraConfig); try
//try {
//{ var clusterBuilder = Cluster.Builder();
// var clusterBuilder = Cluster.Builder(); foreach (var node in cassandraConfig.Nodes)
// foreach (var node in cassandraConfig.Nodes) {
// { clusterBuilder.AddContactPoint(node.Host)
// clusterBuilder.AddContactPoint(node.Host) .WithPort(node.Port);
// .WithPort(node.Port); }
// } clusterBuilder.WithCredentials(cassandraConfig.Username, cassandraConfig.Password);
// clusterBuilder.WithCredentials(cassandraConfig.Username, cassandraConfig.Password); var cluster = clusterBuilder.Build();
// var cluster = clusterBuilder.Build(); using var session = await cluster.ConnectAsync();
// using var session = await cluster.ConnectAsync(); var result = await Task.FromResult(session.Execute("SELECT release_version FROM system.local"));
// var result = await Task.FromResult(session.Execute("SELECT release_version FROM system.local")); var version = result.First().GetValue<string>("release_version");
// var version = result.First().GetValue<string>("release_version"); return HealthCheckResult.Healthy($"Cassandra is healthy. Version: {version}");
// return HealthCheckResult.Healthy($"Cassandra is healthy. Version: {version}"); }
//} catch (Exception ex)
//catch (Exception ex) {
//{ return new HealthCheckResult(context.Registration.FailureStatus, $"Cassandra is unhealthy: {ex.Message}", ex);
// return new HealthCheckResult(context.Registration.FailureStatus, $"Cassandra is unhealthy: {ex.Message}", ex); }
//}
} }
} }
} }

View File

@ -1,4 +1,5 @@
using System.Net.Sockets; using System.Net.Sockets;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IoTDB.Provider;

View File

@ -25,6 +25,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
@ -46,9 +47,13 @@
<PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
<!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" />
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />
<PackageReference Include="Hangfire.Dashboard.BasicAuthorization" Version="1.0.2" />-->
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
@ -56,6 +61,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />

View File

@ -140,7 +140,7 @@
} }
], ],
"ServerApplicationOptions": { "ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus77", "ServerTagName": "JiSheCollectBus99",
"SystemType": "Energy", "SystemType": "Energy",
"FirstCollectionTime": "2025-04-28 15:07:00", "FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00", "AutomaticVerificationTime": "16:07:00",
@ -153,8 +153,5 @@
"PlugInFolder": "", "PlugInFolder": "",
"TCP": { "TCP": {
"ClientPort": 10500 "ClientPort": 10500
},
"BackgroundJobs": {
"IsJobExecutionEnabled": false //
} }
} }

View File

@ -2,6 +2,7 @@
using JiShe.CollectBus.Common; using JiShe.CollectBus.Common;
using JiShe.CollectBus.Migration.Host.HealthChecks; using JiShe.CollectBus.Migration.Host.HealthChecks;
using JiShe.CollectBus.Migration.Host.Swaggers; using JiShe.CollectBus.Migration.Host.Swaggers;
using JiShe.CollectBus.MongoDB;
using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Swashbuckle.AspNetCore.SwaggerUI; using Swashbuckle.AspNetCore.SwaggerUI;
using Volo.Abp; using Volo.Abp;
@ -26,6 +27,7 @@ namespace JiShe.CollectBus.Migration.Host
typeof(AbpAspNetCoreSerilogModule), typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule), typeof(AbpSwashbuckleModule),
typeof(AbpTimingModule), typeof(AbpTimingModule),
typeof(CollectBusMongoDbModule),
typeof(CollectBusMigrationApplicationModule), typeof(CollectBusMigrationApplicationModule),
typeof(AbpCachingStackExchangeRedisModule) typeof(AbpCachingStackExchangeRedisModule)
)] )]

View File

@ -21,6 +21,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" /> <PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
@ -42,14 +43,19 @@
<PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" />
<PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" /> <PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
<!--<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />-->
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
<!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" />
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />
<PackageReference Include="Hangfire.Dashboard.BasicAuthorization" Version="1.0.2" />-->
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application.Contracts\JiShe.CollectBus.Migration.Application.Contracts.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application.Contracts\JiShe.CollectBus.Migration.Application.Contracts.csproj" />
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application\JiShe.CollectBus.Migration.Application.csproj" /> <ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application\JiShe.CollectBus.Migration.Application.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Migration.HttpApi\JiShe.CollectBus.Migration.HttpApi.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Migration.HttpApi\JiShe.CollectBus.Migration.HttpApi.csproj" />