Compare commits
No commits in common. "ea91622217c57ff0d9fb332fe40ef1e7e0b96625" and "04da31f4234a8bd4609cb9f1945578defcf92b70" have entirely different histories.
ea91622217
...
04da31f423
@ -9,8 +9,6 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
|
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
|
||||||
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
|
||||||
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@ -11,7 +11,7 @@ using JiShe.CollectBus.Protocol3761;
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
|
namespace GatherService.WattMeter.AnalysisData.AFN_10H
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 5.16.1.2.1 F1:透明转发 读取SIM卡信息
|
/// 5.16.1.2.1 F1:透明转发 读取SIM卡信息
|
||||||
@ -64,7 +64,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
|
|||||||
DataType = IOTDBDataTypeConst.Data,
|
DataType = IOTDBDataTypeConst.Data,
|
||||||
};
|
};
|
||||||
result?.Invoke(dto);
|
result?.Invoke(dto);
|
||||||
await _dataStorage.SaveDataToIotDbAsync(dto);
|
await _dataStorage.SaveDataToIotDbAsync<string?>(dto);
|
||||||
return await Task.FromResult(true);
|
return await Task.FromResult(true);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
@ -1,12 +1,11 @@
|
|||||||
using JiShe.CollectBus.Common.Consts;
|
using GatherService.WattMeter.AnalysisData.AFN_10H;
|
||||||
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
|
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
|
||||||
using JiShe.CollectBus.Protocol.Dto;
|
using JiShe.CollectBus.Protocol.Dto;
|
||||||
using JiShe.CollectBus.Protocol.Interfaces;
|
using JiShe.CollectBus.Protocol.Interfaces;
|
||||||
using JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter;
|
|
||||||
using JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Watermeter;
|
|
||||||
using JiShe.CollectBus.Protocol3761;
|
using JiShe.CollectBus.Protocol3761;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
@ -55,11 +54,6 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H
|
|||||||
{
|
{
|
||||||
result?.Invoke(dto);
|
result?.Invoke(dto);
|
||||||
});
|
});
|
||||||
else if (value.Contains(F10TranspondMatch.ES190_DC))
|
|
||||||
await _analysisStrategyContext.ExecuteAsync<TB3761>(nameof(ES190DC_Analysis), input, dto =>
|
|
||||||
{
|
|
||||||
result?.Invoke(dto);
|
|
||||||
});
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
//TODO: 写入1条日志
|
//TODO: 写入1条日志
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
using DeviceDetectorNET.Parser.Device;
|
using DeviceDetectorNET.Parser.Device;
|
||||||
|
using GatherService.WattMeter.AnalysisData.AFN_10H;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
@ -18,7 +19,7 @@ using System.Text;
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using static FreeSql.Internal.GlobalFilter;
|
using static FreeSql.Internal.GlobalFilter;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
|
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 透抄 电网频率
|
/// 透抄 电网频率
|
||||||
@ -73,7 +74,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
|
|||||||
|
|
||||||
};
|
};
|
||||||
result?.Invoke(unitDataAnalysis);
|
result?.Invoke(unitDataAnalysis);
|
||||||
await _dataStorage.SaveDataToIotDbAsync(unitDataAnalysis);
|
await _dataStorage.SaveDataToIotDbAsync<decimal?>(unitDataAnalysis);
|
||||||
|
|
||||||
return await Task.FromResult(true);
|
return await Task.FromResult(true);
|
||||||
}
|
}
|
||||||
@ -133,7 +134,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
|
|||||||
string dataMark = string.Join("",dataField.GetRange(0, 4).ReduceHex33(true));
|
string dataMark = string.Join("",dataField.GetRange(0, 4).ReduceHex33(true));
|
||||||
values.Add(dataMark);//数据标识
|
values.Add(dataMark);//数据标识
|
||||||
var readValue = dataField.GetRange(4, len - 4).ReduceHex33(true);//值
|
var readValue = dataField.GetRange(4, len - 4).ReduceHex33(true);//值
|
||||||
await _analysisStrategyContext.ExecuteAsync($"Appendix_{dataMark}", readValue, (value) =>
|
await _analysisStrategyContext.ExecuteAsync<List<string>>($"Appendix_{dataMark}", readValue, (value) =>
|
||||||
{
|
{
|
||||||
values.Add(value.ToString());
|
values.Add(value.ToString());
|
||||||
});
|
});
|
||||||
@ -11,7 +11,7 @@ using JiShe.CollectBus.Protocol.T37612012.AnalysisData;
|
|||||||
using JiShe.CollectBus.Protocol3761;
|
using JiShe.CollectBus.Protocol3761;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
|
namespace GatherService.WattMeter.AnalysisData.AFN_10H
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 透明转发---跳合闸
|
/// 透明转发---跳合闸
|
||||||
@ -42,7 +42,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter
|
|||||||
var data = new AnalysisBaseDto<bool?>()
|
var data = new AnalysisBaseDto<bool?>()
|
||||||
{
|
{
|
||||||
FiledDesc = "跳合闸",
|
FiledDesc = "跳合闸",
|
||||||
DataValue = datas[2].Equals("9C") || datas[2].Equals("94") ? true : false,
|
DataValue = (datas[2].Equals("9C") || datas[2].Equals("94")) ? true : false,
|
||||||
ItemType= "10_98",
|
ItemType= "10_98",
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1,130 +0,0 @@
|
|||||||
using JiShe.CollectBus.Common.Consts;
|
|
||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
|
||||||
using JiShe.CollectBus.Protocol.Contracts.Protocol.Dto;
|
|
||||||
using JiShe.CollectBus.Protocol.Dto;
|
|
||||||
using JiShe.CollectBus.Protocol.Interfaces;
|
|
||||||
using JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Ammeter;
|
|
||||||
using JiShe.CollectBus.Protocol3761;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData.AFN_10H.Watermeter
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// ES_190DC 4G水表读数解析
|
|
||||||
/// </summary>
|
|
||||||
public class ES190DC_Analysis : IAnalysisStrategy<TB3761>
|
|
||||||
{
|
|
||||||
private readonly ILogger<ES190DC_Analysis> _logger;
|
|
||||||
private readonly DataStorage _dataStorage;
|
|
||||||
public ES190DC_Analysis(ILogger<ES190DC_Analysis> logger, DataStorage dataStorage)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
_dataStorage = dataStorage;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<bool> ExecuteAsync(TB3761 input, Action<dynamic>? result = null)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
ArgumentNullException.ThrowIfNull(input);
|
|
||||||
ArgumentNullException.ThrowIfNull(input.A.Code);
|
|
||||||
ArgumentNullException.ThrowIfNull(input.UnitData?.HexMessageList);
|
|
||||||
|
|
||||||
List<AnalysisBaseDto<decimal?>> list = GenerateFinalResult(input.UnitData.HexMessageList);
|
|
||||||
if (list.Count > 0)
|
|
||||||
{
|
|
||||||
// 查询设备信息
|
|
||||||
DeviceInfo? deviceInfo = await _dataStorage.GetDeviceInfoAsync(input.A.Code, input.DA.Pn, list[0].DeviceAddress);
|
|
||||||
if (deviceInfo != null)
|
|
||||||
{
|
|
||||||
list.ForEach(item =>
|
|
||||||
{
|
|
||||||
item.ProjectId = deviceInfo.ProjectID;
|
|
||||||
item.DeviceId = deviceInfo.MeterId;
|
|
||||||
item.DatabaseBusiID = deviceInfo.DatabaseBusiID;
|
|
||||||
item.DeviceAddress = deviceInfo.MeterAddress;
|
|
||||||
item.FocusId = deviceInfo.FocusId;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
UnitDataAnalysis<List<AnalysisBaseDto<decimal?>>> unitDataAnalysis = new UnitDataAnalysis<List<AnalysisBaseDto<decimal?>>>
|
|
||||||
{
|
|
||||||
Code = input.A.Code!,
|
|
||||||
AFN = input.AFN_FC.AFN,
|
|
||||||
Fn = input.DT.Fn,
|
|
||||||
Pn = input.DA.Pn,
|
|
||||||
MSA = input.A.A3!.D1_D7!,
|
|
||||||
PSEQ = input.SEQ.PSEQ,
|
|
||||||
Data = list,
|
|
||||||
ReceivedHexMessage = input.BaseHexMessage.HexMessageString,
|
|
||||||
MessageId = input.MessageId,
|
|
||||||
TimeDensity = 1,//密度-间隔,
|
|
||||||
DensityUnit = DensityUnit.Hour,
|
|
||||||
ReceivedTime = input.ReceivedTime,
|
|
||||||
DataType = IOTDBDataTypeConst.Data
|
|
||||||
|
|
||||||
};
|
|
||||||
result?.Invoke(unitDataAnalysis);
|
|
||||||
await _dataStorage.SaveMultipleDataToIotDbAsync(unitDataAnalysis);
|
|
||||||
|
|
||||||
return await Task.FromResult(true);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"ES190DC 4G水表解析失败:{input.A.Code}-{input.DT.Fn}-{input.BaseHexMessage.HexMessageString},{ex.Message}");
|
|
||||||
return await Task.FromResult(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<AnalysisBaseDto<decimal?>> GenerateFinalResult(List<string> hexMessageList)
|
|
||||||
{
|
|
||||||
List<AnalysisBaseDto<decimal?>> list = new List<AnalysisBaseDto<decimal?>>();
|
|
||||||
string address = string.Join("", hexMessageList.Skip(12).Take(7).Reverse());
|
|
||||||
if (hexMessageList[19].Contains("81"))
|
|
||||||
{
|
|
||||||
int count = Convert.ToInt32(hexMessageList[36].HexToDec());
|
|
||||||
DateTime startTime = Convert.ToDateTime($"{DateTime.Now.Year.ToString().Substring(0, 2)}{hexMessageList[41]}-{hexMessageList[40]}-{hexMessageList[39]} {hexMessageList[38]}:{hexMessageList[37]}:00");
|
|
||||||
|
|
||||||
List<string> valueArr = hexMessageList.GetRange(42, count * 4);
|
|
||||||
int nextIndex = 0;
|
|
||||||
for (int i = 1; i <= count; i++)
|
|
||||||
{
|
|
||||||
AnalysisBaseDto<decimal?> meter = new AnalysisBaseDto<decimal?>();
|
|
||||||
meter.DeviceType = MeterTypeEnum.WaterMeter;
|
|
||||||
meter.DeviceAddress = address;
|
|
||||||
var arr = valueArr.GetRange(nextIndex, 4);
|
|
||||||
var errorCode = arr[4].CheckErrorCode();
|
|
||||||
if (errorCode != null)
|
|
||||||
{
|
|
||||||
meter.ValidData = false;
|
|
||||||
meter.ErrorCodeMsg = errorCode.Item2;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
string val = $"{arr[3]}{arr[2]}{arr[1]}.{arr[0]}";
|
|
||||||
if (decimal.TryParse(val, out decimal value))
|
|
||||||
meter.DataValue = value;
|
|
||||||
}
|
|
||||||
string timeSpan = startTime.AddHours(i - 1).ToString("yyyy-MM-dd HH:mm:ss");
|
|
||||||
if (DateTime.TryParse(timeSpan, out DateTime readingDate))
|
|
||||||
{
|
|
||||||
meter.TimeSpan = readingDate;
|
|
||||||
}
|
|
||||||
meter.ItemType = "10_250";
|
|
||||||
meter.FiledDesc = "当前累积流量";
|
|
||||||
meter.FiledName = meter.ItemType.GetDataFieldByGatherDataType() ?? string.Empty;
|
|
||||||
nextIndex += 4;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -27,6 +27,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
|
|
||||||
private readonly IProducerService _producerService;
|
private readonly IProducerService _producerService;
|
||||||
|
|
||||||
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
|
|
||||||
public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers;
|
public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers;
|
||||||
@ -40,6 +41,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
_logger = logger;
|
_logger = logger;
|
||||||
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
|
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
|
||||||
_producerService = serviceProvider.GetRequiredService<IProducerService>();
|
_producerService = serviceProvider.GetRequiredService<IProducerService>();
|
||||||
|
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
|
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
|
||||||
}
|
}
|
||||||
@ -131,6 +133,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
_logger.LogError($"不支持的上报kafka主题:{topicName}");
|
_logger.LogError($"不支持的上报kafka主题:{topicName}");
|
||||||
|
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,11 +20,13 @@ namespace JiShe.CollectBus.Protocol.Abstracts
|
|||||||
public const string errorData = "EE";
|
public const string errorData = "EE";
|
||||||
|
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
|
private readonly IRepository<ProtocolInfo, Guid> _protocolInfoRepository;
|
||||||
private readonly IFreeRedisProvider _redisProvider;
|
private readonly IFreeRedisProvider _redisProvider;
|
||||||
|
|
||||||
public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger)
|
public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_protocolInfoRepository = serviceProvider.GetRequiredService<IRepository<ProtocolInfo, Guid>>();
|
||||||
_redisProvider = serviceProvider.GetRequiredService<IFreeRedisProvider>();
|
_redisProvider = serviceProvider.GetRequiredService<IFreeRedisProvider>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,8 +40,10 @@ namespace JiShe.CollectBus.Protocol.Abstracts
|
|||||||
if (Info == null)
|
if (Info == null)
|
||||||
{
|
{
|
||||||
throw new ArgumentNullException(nameof(Info));
|
throw new ArgumentNullException(nameof(Info));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
|
||||||
|
await _protocolInfoRepository.InsertAsync(Info);
|
||||||
await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
|
await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
|
||||||
await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
|
await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,6 +12,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
{
|
{
|
||||||
Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
|
Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
|
||||||
Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
|
Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessage);
|
||||||
|
Task<ISubscribeAck> ReceivedEvent(MessageProtocolAnalysis<TB3761> receivedMessage);
|
||||||
Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessage);
|
Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessage);
|
||||||
Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessage);
|
Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessage);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,13 @@
|
|||||||
using JiShe.CollectBus.DataChannels;
|
using Cassandra.Mapping;
|
||||||
|
using JiShe.CollectBus.Cassandra;
|
||||||
|
using JiShe.CollectBus.DataChannels;
|
||||||
using JiShe.CollectBus.FreeRedis;
|
using JiShe.CollectBus.FreeRedis;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.Interceptors;
|
using JiShe.CollectBus.Interceptors;
|
||||||
using JiShe.CollectBus.IoTDB;
|
using JiShe.CollectBus.IoTDB;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.Kafka;
|
using JiShe.CollectBus.Kafka;
|
||||||
|
using JiShe.CollectBus.Mappers;
|
||||||
using JiShe.CollectBus.Protocol;
|
using JiShe.CollectBus.Protocol;
|
||||||
using JiShe.CollectBus.ScheduledMeterReading;
|
using JiShe.CollectBus.ScheduledMeterReading;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
@ -16,8 +19,12 @@ using System.Threading.Channels;
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
using Volo.Abp.Application;
|
using Volo.Abp.Application;
|
||||||
|
using Volo.Abp.AuditLogging;
|
||||||
using Volo.Abp.Autofac;
|
using Volo.Abp.Autofac;
|
||||||
using Volo.Abp.AutoMapper;
|
using Volo.Abp.AutoMapper;
|
||||||
|
using Volo.Abp.BackgroundJobs;
|
||||||
|
using Volo.Abp.BackgroundWorkers;
|
||||||
|
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
|
|
||||||
namespace JiShe.CollectBus;
|
namespace JiShe.CollectBus;
|
||||||
@ -27,12 +34,16 @@ namespace JiShe.CollectBus;
|
|||||||
typeof(CollectBusApplicationContractsModule),
|
typeof(CollectBusApplicationContractsModule),
|
||||||
typeof(AbpDddApplicationModule),
|
typeof(AbpDddApplicationModule),
|
||||||
typeof(AbpAutoMapperModule),
|
typeof(AbpAutoMapperModule),
|
||||||
|
typeof(AbpAutofacModule),
|
||||||
|
typeof(AbpBackgroundWorkersHangfireModule),
|
||||||
typeof(CollectBusFreeRedisModule),
|
typeof(CollectBusFreeRedisModule),
|
||||||
typeof(CollectBusFreeSqlModule),
|
typeof(CollectBusFreeSqlModule),
|
||||||
typeof(CollectBusKafkaModule),
|
typeof(CollectBusKafkaModule),
|
||||||
typeof(CollectBusIoTDbModule),
|
typeof(CollectBusIoTDbModule),
|
||||||
typeof(AbpAutofacModule),
|
|
||||||
typeof(CollectBusDomainSharedModule),
|
typeof(CollectBusDomainSharedModule),
|
||||||
|
typeof(AbpAuditLoggingDomainModule),
|
||||||
|
typeof(AbpBackgroundJobsDomainModule),
|
||||||
|
typeof(CollectBusCassandraModule),
|
||||||
typeof(CollectBusProtocolModule)
|
typeof(CollectBusProtocolModule)
|
||||||
)]
|
)]
|
||||||
public class CollectBusApplicationModule : AbpModule
|
public class CollectBusApplicationModule : AbpModule
|
||||||
@ -44,8 +55,8 @@ public class CollectBusApplicationModule : AbpModule
|
|||||||
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
|
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
|
||||||
Configure<AbpAutoMapperOptions>(options => { options.AddMaps<CollectBusApplicationModule>(true); });
|
Configure<AbpAutoMapperOptions>(options => { options.AddMaps<CollectBusApplicationModule>(true); });
|
||||||
|
|
||||||
//context.Services.AddSingleton(new MappingConfiguration()
|
context.Services.AddSingleton(new MappingConfiguration()
|
||||||
// .Define(new CollectBusMapping()));
|
.Define(new CollectBusMapping()));
|
||||||
|
|
||||||
// 注册拦截器
|
// 注册拦截器
|
||||||
context.Services.OnRegistered(ctx =>
|
context.Services.OnRegistered(ctx =>
|
||||||
@ -59,17 +70,33 @@ public class CollectBusApplicationModule : AbpModule
|
|||||||
public override async Task OnApplicationInitializationAsync(
|
public override async Task OnApplicationInitializationAsync(
|
||||||
ApplicationInitializationContext context)
|
ApplicationInitializationContext context)
|
||||||
{
|
{
|
||||||
//var assembly = Assembly.GetExecutingAssembly();
|
var assembly = Assembly.GetExecutingAssembly();
|
||||||
//var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface)
|
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface)
|
||||||
// .ToList();
|
.ToList();
|
||||||
//foreach (var type in types)
|
foreach (var type in types) await context.AddBackgroundWorkerAsync(type);
|
||||||
|
|
||||||
|
//Task.Run(() =>
|
||||||
//{
|
//{
|
||||||
// await context.AddBackgroundWorkerAsync(type);
|
// //默认初始化表计信息
|
||||||
//}
|
// var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||||
|
// dbContext.InitAmmeterCacheData();
|
||||||
|
// //await dbContext.InitWatermeterCacheData();
|
||||||
|
//}).ConfigureAwait(false);
|
||||||
|
|
||||||
//下发任务通道构建
|
//下发任务通道构建
|
||||||
DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
|
DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
|
||||||
|
|
||||||
|
|
||||||
|
// 日志存储通道构建
|
||||||
|
DataChannelManage.LogSaveChannel = Channel.CreateUnbounded<object>();
|
||||||
|
|
||||||
|
// 日志刷新通道构建
|
||||||
|
DataChannelManage.LogRefreshChannel = Channel.CreateUnbounded<object>();
|
||||||
|
|
||||||
|
// 启动通道任务
|
||||||
|
var _dataChannelManage = context.ServiceProvider.GetRequiredService<DataChannelManageService>();
|
||||||
|
_ = _dataChannelManage.LogSaveAsync(DataChannelManage.LogSaveChannel.Reader);
|
||||||
|
|
||||||
//默认初始化表计信息
|
//默认初始化表计信息
|
||||||
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||||
await dbContext.InitAmmeterCacheData("V4-Gather-8890");
|
await dbContext.InitAmmeterCacheData("V4-Gather-8890");
|
||||||
|
|||||||
@ -1,4 +1,6 @@
|
|||||||
using JiShe.CollectBus.Common;
|
using Amazon.Runtime.Internal.Transform;
|
||||||
|
using DnsClient.Protocol;
|
||||||
|
using JiShe.CollectBus.Common;
|
||||||
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
||||||
using JiShe.CollectBus.IoTDB.Context;
|
using JiShe.CollectBus.IoTDB.Context;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
@ -8,6 +10,7 @@ using JiShe.CollectBus.Kafka.Internal;
|
|||||||
using JiShe.CollectBus.Kafka.Producer;
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
using JiShe.CollectBus.Protocol.Dto;
|
using JiShe.CollectBus.Protocol.Dto;
|
||||||
using JiShe.CollectBus.Protocol.Models;
|
using JiShe.CollectBus.Protocol.Models;
|
||||||
|
using JiShe.CollectBus.Repository.LogRecord;
|
||||||
using Mapster;
|
using Mapster;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
@ -19,7 +22,7 @@ using System.Linq;
|
|||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Channels;
|
using System.Threading.Channels;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.DataChannels
|
namespace JiShe.CollectBus.DataChannels
|
||||||
{
|
{
|
||||||
@ -32,20 +35,23 @@ namespace JiShe.CollectBus.DataChannels
|
|||||||
private readonly IIoTDbProvider _dbProvider;
|
private readonly IIoTDbProvider _dbProvider;
|
||||||
private readonly IProducerService _producerService;
|
private readonly IProducerService _producerService;
|
||||||
private readonly KafkaOptionConfig _kafkaOptions;
|
private readonly KafkaOptionConfig _kafkaOptions;
|
||||||
private readonly ServerApplicationOptions _applicationOptions;
|
private readonly ServerApplicationOptions _applicationOptions;
|
||||||
|
private readonly ILogRecordRepository _logRecordRepository;
|
||||||
|
|
||||||
public DataChannelManageService(
|
public DataChannelManageService(
|
||||||
ILogger<DataChannelManageService> logger,
|
ILogger<DataChannelManageService> logger,
|
||||||
IIoTDbProvider dbProvider,
|
IIoTDbProvider dbProvider,
|
||||||
IProducerService producerService,
|
IProducerService producerService,
|
||||||
IOptions<KafkaOptionConfig> kafkaOptions,
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
||||||
IOptions<ServerApplicationOptions> applicationOptions)
|
IOptions<ServerApplicationOptions> applicationOptions,
|
||||||
|
ILogRecordRepository logRecordRepository)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_dbProvider = dbProvider;
|
_dbProvider = dbProvider;
|
||||||
_producerService = producerService;
|
_producerService = producerService;
|
||||||
_kafkaOptions = kafkaOptions.Value;
|
_kafkaOptions = kafkaOptions.Value;
|
||||||
_applicationOptions = applicationOptions.Value;
|
_applicationOptions = applicationOptions.Value;
|
||||||
|
_logRecordRepository= logRecordRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -201,126 +207,126 @@ namespace JiShe.CollectBus.DataChannels
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
///// <summary>
|
/// <summary>
|
||||||
///// 日志保存
|
/// 日志保存
|
||||||
///// </summary>
|
/// </summary>
|
||||||
///// <param name="channelReader"></param>
|
/// <param name="channelReader"></param>
|
||||||
///// <returns></returns>
|
/// <returns></returns>
|
||||||
//public async Task LogSaveAsync(ChannelReader<object> channelReader)
|
public async Task LogSaveAsync(ChannelReader<object> channelReader)
|
||||||
//{
|
{
|
||||||
// const int BatchSize = 1000;
|
const int BatchSize = 1000;
|
||||||
// const int EmptyWaitMilliseconds = 1000;
|
const int EmptyWaitMilliseconds = 1000;
|
||||||
// var timeout = TimeSpan.FromSeconds(2);
|
var timeout = TimeSpan.FromSeconds(2);
|
||||||
// var timer = Stopwatch.StartNew();
|
var timer = Stopwatch.StartNew();
|
||||||
// long timeoutMilliseconds = 0;
|
long timeoutMilliseconds = 0;
|
||||||
// try
|
try
|
||||||
// {
|
{
|
||||||
// while (true)
|
while (true)
|
||||||
// {
|
{
|
||||||
// var batch = new List<object>();
|
var batch = new List<object>();
|
||||||
// var canRead = channelReader.Count;
|
var canRead = channelReader.Count;
|
||||||
// if (canRead <= 0)
|
if (canRead <= 0)
|
||||||
// {
|
{
|
||||||
// if (timeoutMilliseconds > 0)
|
if (timeoutMilliseconds > 0)
|
||||||
// {
|
{
|
||||||
// _logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
|
_logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
|
||||||
// }
|
}
|
||||||
// timeoutMilliseconds = 0;
|
timeoutMilliseconds = 0;
|
||||||
// //无消息时短等待1秒
|
//无消息时短等待1秒
|
||||||
// await Task.Delay(EmptyWaitMilliseconds);
|
await Task.Delay(EmptyWaitMilliseconds);
|
||||||
// continue;
|
continue;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// timer.Restart();
|
timer.Restart();
|
||||||
// var startTime = DateTime.Now;
|
var startTime = DateTime.Now;
|
||||||
|
|
||||||
// try
|
try
|
||||||
// {
|
{
|
||||||
// // 异步批量读取数据
|
// 异步批量读取数据
|
||||||
// while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout)
|
while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout)
|
||||||
// {
|
{
|
||||||
// try
|
try
|
||||||
// {
|
{
|
||||||
// if (channelReader.TryRead(out var dataItem))
|
if (channelReader.TryRead(out var dataItem))
|
||||||
// {
|
{
|
||||||
// batch.Add(dataItem);
|
batch.Add(dataItem);
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// catch (Exception)
|
catch (Exception)
|
||||||
// {
|
{
|
||||||
// throw;
|
throw;
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// catch (Exception)
|
catch (Exception)
|
||||||
// {
|
{
|
||||||
// throw;
|
throw;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// if (batch == null || batch.Count == 0)
|
if (batch == null || batch.Count == 0)
|
||||||
// {
|
{
|
||||||
// await Task.Delay(EmptyWaitMilliseconds);
|
await Task.Delay(EmptyWaitMilliseconds);
|
||||||
// continue;
|
continue;
|
||||||
// }
|
}
|
||||||
// try
|
try
|
||||||
// {
|
{
|
||||||
|
|
||||||
// // 按小时分组
|
// 按小时分组
|
||||||
// var hourGroups = new Dictionary<DateTime, List<LogRecords>>();
|
var hourGroups = new Dictionary<DateTime, List<LogRecords>>();
|
||||||
// DateTime? dateTime = null;
|
DateTime? dateTime = null;
|
||||||
// List<LogRecords> batchList = new List<LogRecords>();
|
List<LogRecords> batchList = new List<LogRecords>();
|
||||||
// int index = 1;
|
int index = 1;
|
||||||
// foreach (var item in batch)
|
foreach (var item in batch)
|
||||||
// {
|
{
|
||||||
// var records = item.Adapt<LogRecords>();
|
var records = item.Adapt<LogRecords>();
|
||||||
|
|
||||||
// if (!records.ReceivedTime.HasValue)
|
if (!records.ReceivedTime.HasValue)
|
||||||
// records.ReceivedTime = DateTime.Now;
|
records.ReceivedTime = DateTime.Now;
|
||||||
// var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0);
|
var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0);
|
||||||
// if (!dateTime.HasValue || curDateTime != dateTime)
|
if (!dateTime.HasValue || curDateTime != dateTime)
|
||||||
// {
|
{
|
||||||
// dateTime = curDateTime;
|
dateTime = curDateTime;
|
||||||
// if (batchList.Count > 0)
|
if (batchList.Count > 0)
|
||||||
// {
|
{
|
||||||
// var immutableList = ImmutableList.CreateRange(batchList);
|
var immutableList = ImmutableList.CreateRange(batchList);
|
||||||
// hourGroups.Add(dateTime.Value, immutableList.ToList());
|
hourGroups.Add(dateTime.Value, immutableList.ToList());
|
||||||
// batchList.Clear();
|
batchList.Clear();
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// batchList.Add(records);
|
batchList.Add(records);
|
||||||
// // 最后一批
|
// 最后一批
|
||||||
// if(index== batch.Count)
|
if(index== batch.Count)
|
||||||
// {
|
{
|
||||||
// var immutableList = ImmutableList.CreateRange(batchList);
|
var immutableList = ImmutableList.CreateRange(batchList);
|
||||||
// hourGroups.Add(dateTime.Value, immutableList.ToList());
|
hourGroups.Add(dateTime.Value, immutableList.ToList());
|
||||||
// batchList.Clear();
|
batchList.Clear();
|
||||||
// }
|
}
|
||||||
// index++;
|
index++;
|
||||||
// }
|
}
|
||||||
// foreach (var (time, records) in hourGroups)
|
foreach (var (time, records) in hourGroups)
|
||||||
// {
|
{
|
||||||
// // 批量写入数据库
|
// 批量写入数据库
|
||||||
// await _logRecordRepository.InsertManyAsync(records, time);
|
await _logRecordRepository.InsertManyAsync(records, time);
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// catch (Exception ex)
|
catch (Exception ex)
|
||||||
// {
|
{
|
||||||
// _logger.LogError(ex, "数据通道处理日志数据时发生异常");
|
_logger.LogError(ex, "数据通道处理日志数据时发生异常");
|
||||||
// }
|
}
|
||||||
// batch.Clear();
|
batch.Clear();
|
||||||
// timer.Stop();
|
timer.Stop();
|
||||||
|
|
||||||
// timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
|
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
|
||||||
|
|
||||||
// startTime = DateTime.Now;
|
startTime = DateTime.Now;
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// catch (Exception ex)
|
catch (Exception ex)
|
||||||
// {
|
{
|
||||||
// _logger.LogCritical(ex, "日志处理发生致命错误");
|
_logger.LogCritical(ex, "日志处理发生致命错误");
|
||||||
// throw;
|
throw;
|
||||||
// }
|
}
|
||||||
//}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,22 +17,20 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.AutoMapper" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AutoMapper" Version="8.3.3" />
|
||||||
|
<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Ddd.Application" Version="8.3.3" />
|
||||||
<PackageReference Include="TouchSocket" Version="3.1.2" />
|
<PackageReference Include="TouchSocket" Version="3.1.2" />
|
||||||
<PackageReference Include="TouchSocket.Hosting" Version="3.1.2" />
|
<PackageReference Include="TouchSocket.Hosting" Version="3.1.2" />
|
||||||
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.EventBus.Kafka" Version="8.3.3" />
|
||||||
|
<ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj" />
|
||||||
<ProjectReference Include="..\..\protocols\JiShe.CollectBus.Protocol\JiShe.CollectBus.Protocol.csproj" />
|
|
||||||
|
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
|
|
||||||
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj" />
|
||||||
<ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj" />
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.FreeRedis\JiShe.CollectBus.FreeRedis.csproj" />
|
||||||
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
|
||||||
</ItemGroup>
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<Folder Include="Mappers\" />
|
|
||||||
<Folder Include="Workers\" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -0,0 +1,17 @@
|
|||||||
|
using Cassandra.Mapping;
|
||||||
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Mappers
|
||||||
|
{
|
||||||
|
public class CollectBusMapping: Mappings
|
||||||
|
{
|
||||||
|
public CollectBusMapping()
|
||||||
|
{
|
||||||
|
For<MessageIssued>()
|
||||||
|
.Column(e => e.Type, cm => cm.WithName("type").WithDbType<int>());
|
||||||
|
For<Device>()
|
||||||
|
.Column(e => e.Status, cm => cm.WithName("status").WithDbType<int>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,4 +1,6 @@
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
using Cassandra;
|
||||||
|
using JiShe.CollectBus.Cassandra;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.DynamicModule;
|
using JiShe.CollectBus.DynamicModule;
|
||||||
using JiShe.CollectBus.Interceptors;
|
using JiShe.CollectBus.Interceptors;
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
@ -20,8 +22,8 @@ namespace JiShe.CollectBus.Samples;
|
|||||||
public class TestAppService : CollectBusAppService
|
public class TestAppService : CollectBusAppService
|
||||||
{
|
{
|
||||||
private readonly ILogger<TestAppService> _logger;
|
private readonly ILogger<TestAppService> _logger;
|
||||||
//private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
|
private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
|
||||||
//private readonly ICassandraProvider _cassandraProvider;
|
private readonly ICassandraProvider _cassandraProvider;
|
||||||
private readonly IProtocolService _protocolService;
|
private readonly IProtocolService _protocolService;
|
||||||
private readonly IServiceProvider _serviceProvider;
|
private readonly IServiceProvider _serviceProvider;
|
||||||
private readonly IDynamicModuleManager _dynamicModuleManager;
|
private readonly IDynamicModuleManager _dynamicModuleManager;
|
||||||
@ -29,94 +31,93 @@ public class TestAppService : CollectBusAppService
|
|||||||
|
|
||||||
public TestAppService(
|
public TestAppService(
|
||||||
ILogger<TestAppService> logger,
|
ILogger<TestAppService> logger,
|
||||||
//ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository,
|
ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository,
|
||||||
//ICassandraProvider cassandraProvider,
|
ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager)
|
||||||
IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager)
|
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
//_messageReceivedCassandraRepository = messageReceivedCassandraRepository;
|
_messageReceivedCassandraRepository = messageReceivedCassandraRepository;
|
||||||
//_cassandraProvider = cassandraProvider;
|
_cassandraProvider = cassandraProvider;
|
||||||
_protocolService = protocolService;
|
_protocolService = protocolService;
|
||||||
_serviceProvider = serviceProvider;
|
_serviceProvider = serviceProvider;
|
||||||
_dynamicModuleManager = dynamicModuleManager;
|
_dynamicModuleManager = dynamicModuleManager;
|
||||||
}
|
}
|
||||||
public async Task AddMessageOfCassandra()
|
public async Task AddMessageOfCassandra()
|
||||||
{
|
{
|
||||||
//var stopwatch = Stopwatch.StartNew();
|
var stopwatch = Stopwatch.StartNew();
|
||||||
//for (int i = 1; i <= 10000; i++)
|
for (int i = 1; i <= 10000; i++)
|
||||||
//{
|
{
|
||||||
// var str = Guid.NewGuid().ToString();
|
var str = Guid.NewGuid().ToString();
|
||||||
// await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued
|
await _messageReceivedCassandraRepository.InsertAsync(new MessageIssued
|
||||||
// {
|
{
|
||||||
// ClientId = str,
|
ClientId = str,
|
||||||
// DeviceNo = i.ToString(),
|
DeviceNo = i.ToString(),
|
||||||
// MessageId = str,
|
MessageId = str,
|
||||||
// Type = IssuedEventType.Data,
|
Type = IssuedEventType.Data,
|
||||||
// Id = str,
|
Id = str,
|
||||||
// Message = str.GetBytes()
|
Message = str.GetBytes()
|
||||||
// });
|
});
|
||||||
//}
|
}
|
||||||
//stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
//_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
|
_logger.LogWarning($"插入 {10000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task AddMessageOfBulkInsertCassandra()
|
public async Task AddMessageOfBulkInsertCassandra()
|
||||||
{
|
{
|
||||||
//var records = new List<MessageIssued>();
|
var records = new List<MessageIssued>();
|
||||||
//var prepared = await _cassandraProvider.Session.PrepareAsync(
|
var prepared = await _cassandraProvider.Session.PrepareAsync(
|
||||||
// $"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)");
|
$"INSERT INTO {_cassandraProvider.CassandraConfig.Keyspace}.{nameof(MessageIssued)} (id, clientid, message, deviceno,type,messageid) VALUES (?, ?, ?, ?, ?, ?)");
|
||||||
|
|
||||||
//for (int i = 1; i <= 100000; i++)
|
for (int i = 1; i <= 100000; i++)
|
||||||
//{
|
{
|
||||||
// var str = Guid.NewGuid().ToString();
|
var str = Guid.NewGuid().ToString();
|
||||||
// records.Add(new MessageIssued
|
records.Add(new MessageIssued
|
||||||
// {
|
{
|
||||||
// ClientId = str,
|
ClientId = str,
|
||||||
// DeviceNo = i.ToString(),
|
DeviceNo = i.ToString(),
|
||||||
// MessageId = str,
|
MessageId = str,
|
||||||
// Type = IssuedEventType.Data,
|
Type = IssuedEventType.Data,
|
||||||
// Id = str,
|
Id = str,
|
||||||
// Message = str.GetBytes()
|
Message = str.GetBytes()
|
||||||
// });
|
});
|
||||||
//}
|
}
|
||||||
//var stopwatch = Stopwatch.StartNew();
|
var stopwatch = Stopwatch.StartNew();
|
||||||
//await BulkInsertAsync(_cassandraProvider.Session, prepared, records);
|
await BulkInsertAsync(_cassandraProvider.Session, prepared, records);
|
||||||
//stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
//_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
|
_logger.LogWarning($"插入 {100000} 条记录完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
|
||||||
}
|
}
|
||||||
|
|
||||||
//private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List<MessageIssued> records)
|
private static async Task BulkInsertAsync(ISession session, PreparedStatement prepared, List<MessageIssued> records)
|
||||||
//{
|
{
|
||||||
// var tasks = new List<Task>();
|
var tasks = new List<Task>();
|
||||||
// var batch = new BatchStatement();
|
var batch = new BatchStatement();
|
||||||
|
|
||||||
// for (int i = 0; i < records.Count; i++)
|
for (int i = 0; i < records.Count; i++)
|
||||||
// {
|
{
|
||||||
// var record = records[i];
|
var record = records[i];
|
||||||
// var boundStatement = prepared.Bind(
|
var boundStatement = prepared.Bind(
|
||||||
// record.Id,
|
record.Id,
|
||||||
// record.ClientId,
|
record.ClientId,
|
||||||
// record.Message,
|
record.Message,
|
||||||
// record.DeviceNo,
|
record.DeviceNo,
|
||||||
// (int)record.Type,
|
(int)record.Type,
|
||||||
// record.MessageId);
|
record.MessageId);
|
||||||
|
|
||||||
// // 设置一致性级别为ONE以提高性能
|
// 设置一致性级别为ONE以提高性能
|
||||||
// boundStatement.SetConsistencyLevel(ConsistencyLevel.One);
|
boundStatement.SetConsistencyLevel(ConsistencyLevel.One);
|
||||||
|
|
||||||
// batch.Add(boundStatement);
|
batch.Add(boundStatement);
|
||||||
|
|
||||||
// // 当达到批处理大小时执行
|
// 当达到批处理大小时执行
|
||||||
// if (batch.Statements.Count() >= 1000 || i == records.Count - 1)
|
if (batch.Statements.Count() >= 1000 || i == records.Count - 1)
|
||||||
// {
|
{
|
||||||
// tasks.Add(session.ExecuteAsync(batch));
|
tasks.Add(session.ExecuteAsync(batch));
|
||||||
// batch = new BatchStatement();
|
batch = new BatchStatement();
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
|
|
||||||
// // 等待所有批处理完成
|
// 等待所有批处理完成
|
||||||
// await Task.WhenAll(tasks);
|
await Task.WhenAll(tasks);
|
||||||
//}
|
}
|
||||||
|
|
||||||
[LogIntercept]
|
[LogIntercept]
|
||||||
public virtual Task<string> LogInterceptorTest(string str)
|
public virtual Task<string> LogInterceptorTest(string str)
|
||||||
|
|||||||
@ -332,7 +332,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
// 创建取消令牌源
|
// 创建取消令牌源
|
||||||
//var cts = new CancellationTokenSource();
|
//var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
await _dbProvider.GetSessionPool(true).InitTableSessionModelAsync();
|
||||||
|
|
||||||
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
|
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
|
||||||
|
|
||||||
|
|||||||
@ -7,6 +7,7 @@ using JiShe.CollectBus.Kafka.Internal;
|
|||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Protocol.Interfaces;
|
using JiShe.CollectBus.Protocol.Interfaces;
|
||||||
using JiShe.CollectBus.Protocol3761;
|
using JiShe.CollectBus.Protocol3761;
|
||||||
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
@ -23,6 +24,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
private readonly ILogger<SubscriberAnalysisAppService> _logger;
|
private readonly ILogger<SubscriberAnalysisAppService> _logger;
|
||||||
private readonly ITcpService _tcpService;
|
private readonly ITcpService _tcpService;
|
||||||
private readonly IServiceProvider _serviceProvider;
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
||||||
private readonly IIoTDbProvider _dbProvider;
|
private readonly IIoTDbProvider _dbProvider;
|
||||||
private readonly IProtocolService _protocolService;
|
private readonly IProtocolService _protocolService;
|
||||||
|
|
||||||
@ -30,11 +32,12 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
ITcpService tcpService,
|
ITcpService tcpService,
|
||||||
IServiceProvider serviceProvider,
|
IServiceProvider serviceProvider,
|
||||||
IIoTDbProvider dbProvider,
|
IIoTDbProvider dbProvider,
|
||||||
IProtocolService protocolService)
|
IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
_serviceProvider = serviceProvider;
|
_serviceProvider = serviceProvider;
|
||||||
|
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
||||||
_dbProvider = dbProvider;
|
_dbProvider = dbProvider;
|
||||||
_protocolService = protocolService;
|
_protocolService = protocolService;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,172 @@
|
|||||||
|
using JiShe.CollectBus.Common.Consts;
|
||||||
|
using JiShe.CollectBus.Common.Enums;
|
||||||
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.Interceptors;
|
||||||
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
|
using JiShe.CollectBus.Kafka.Attributes;
|
||||||
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
|
using JiShe.CollectBus.Protocol.Interfaces;
|
||||||
|
using JiShe.CollectBus.Protocol3761;
|
||||||
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using TouchSocket.Sockets;
|
||||||
|
using Volo.Abp.Domain.Repositories;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Subscribers
|
||||||
|
{
|
||||||
|
public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe
|
||||||
|
{
|
||||||
|
private readonly ILogger<SubscriberAppService> _logger;
|
||||||
|
private readonly ITcpService _tcpService;
|
||||||
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository;
|
||||||
|
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
|
||||||
|
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
||||||
|
private readonly IIoTDbProvider _dbProvider;
|
||||||
|
private readonly IProtocolService _protocolService;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="logger">The logger.</param>
|
||||||
|
/// <param name="tcpService">The TCP service.</param>
|
||||||
|
/// <param name="serviceProvider">The service provider.</param>
|
||||||
|
/// <param name="messageReceivedLoginEventRepository">The message received login event repository.</param>
|
||||||
|
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
|
||||||
|
/// <param name="meterReadingRecordsRepository">The device repository.</param>
|
||||||
|
public SubscriberAppService(ILogger<SubscriberAppService> logger,
|
||||||
|
ITcpService tcpService,
|
||||||
|
IServiceProvider serviceProvider,
|
||||||
|
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
|
||||||
|
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
|
||||||
|
IIoTDbProvider dbProvider,
|
||||||
|
IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_tcpService = tcpService;
|
||||||
|
_serviceProvider = serviceProvider;
|
||||||
|
_messageReceivedLoginEventRepository = messageReceivedLoginEventRepository;
|
||||||
|
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
|
||||||
|
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
||||||
|
_dbProvider = dbProvider;
|
||||||
|
_protocolService = protocolService;
|
||||||
|
}
|
||||||
|
|
||||||
|
[LogIntercept]
|
||||||
|
[KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)]
|
||||||
|
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
|
{
|
||||||
|
bool isAck = true;
|
||||||
|
foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
|
{
|
||||||
|
//var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
|
//if (loginEntity == null)
|
||||||
|
//{
|
||||||
|
// isAck=false;
|
||||||
|
// break;
|
||||||
|
//}
|
||||||
|
|
||||||
|
//loginEntity.AckTime = Clock.Now;
|
||||||
|
//loginEntity.IsAck = true;
|
||||||
|
//await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
||||||
|
}
|
||||||
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
|
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
|
||||||
|
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
|
||||||
|
{
|
||||||
|
bool isAck = true;
|
||||||
|
//foreach (var issuedEventMessage in issuedEventMessages)
|
||||||
|
//{
|
||||||
|
// var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
|
||||||
|
// if (heartbeatEntity == null)
|
||||||
|
// {
|
||||||
|
// isAck = false;
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
// heartbeatEntity.AckTime = Clock.Now;
|
||||||
|
// heartbeatEntity.IsAck = true;
|
||||||
|
// await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
||||||
|
// }
|
||||||
|
// TODO:暂时ACK,等后续处理是否放到私信队列中
|
||||||
|
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
||||||
|
public async Task<ISubscribeAck> ReceivedEvent(MessageProtocolAnalysis<TB3761> receivedMessage)
|
||||||
|
{
|
||||||
|
var currentTime = Clock.Now;
|
||||||
|
|
||||||
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
|
if (protocolPlugin == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("协议不存在!");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
//todo 会根据不同的协议进行解析,然后做业务处理
|
||||||
|
//TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
|
||||||
|
//if (tB3761 == null)
|
||||||
|
//{
|
||||||
|
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
||||||
|
// return SubscribeAck.Success();
|
||||||
|
//}
|
||||||
|
//if (tB3761.DT == null || tB3761.AFN_FC == null)
|
||||||
|
//{
|
||||||
|
// Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
|
||||||
|
// return SubscribeAck.Success();
|
||||||
|
//}
|
||||||
|
|
||||||
|
//报文入库
|
||||||
|
var entity = new MeterReadingRecords()
|
||||||
|
{
|
||||||
|
ReceivedMessageHexString = receivedMessage.MessageHexString,
|
||||||
|
AFN = (AFN)receivedMessage.Data?.AFN_FC.AFN!,
|
||||||
|
Fn = receivedMessage.Data.DT.Fn,
|
||||||
|
Pn = 0,
|
||||||
|
FocusAddress = "",
|
||||||
|
MeterAddress = "",
|
||||||
|
};
|
||||||
|
|
||||||
|
//如果没数据,则插入,有数据则更新
|
||||||
|
var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime);
|
||||||
|
if (updateEntity == null)
|
||||||
|
{
|
||||||
|
await _meterReadingRecordsRepository.InsertAsync(entity, currentTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//_dbProvider.InsertAsync();
|
||||||
|
//todo 查找是否有下发任务
|
||||||
|
|
||||||
|
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
return SubscribeAck.Success();
|
||||||
|
}
|
||||||
|
|
||||||
|
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)]
|
||||||
|
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
|
||||||
|
{
|
||||||
|
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
|
||||||
|
return SubscribeAck.Success();
|
||||||
|
}
|
||||||
|
|
||||||
|
[KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)]
|
||||||
|
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
|
||||||
|
{
|
||||||
|
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
|
||||||
|
return SubscribeAck.Success();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,43 @@
|
|||||||
|
//using System;
|
||||||
|
//using System.Threading;
|
||||||
|
//using System.Threading.Tasks;
|
||||||
|
//using Hangfire;
|
||||||
|
//using JiShe.CollectBus.Common.Consts;
|
||||||
|
//using JiShe.CollectBus.ScheduledMeterReading;
|
||||||
|
//using Microsoft.Extensions.Logging;
|
||||||
|
//using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
|
//using Volo.Abp.DependencyInjection;
|
||||||
|
//using Volo.Abp.Uow;
|
||||||
|
|
||||||
|
//namespace JiShe.CollectBus.Workers
|
||||||
|
//{
|
||||||
|
// /// <summary>
|
||||||
|
// /// 构建待处理的下发指令任务处理
|
||||||
|
// /// </summary>
|
||||||
|
// public class CreateToBeIssueTaskWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker
|
||||||
|
// {
|
||||||
|
// private readonly ILogger<CreateToBeIssueTaskWorker> _logger;
|
||||||
|
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||||
|
|
||||||
|
// /// <summary>
|
||||||
|
// /// Initializes a new instance of the <see cref="CreateToBeIssueTaskWorker"/> class.
|
||||||
|
// /// </summary>
|
||||||
|
// /// <param name="logger">The logger.</param>
|
||||||
|
// /// <param name="scheduledMeterReadingService">定时任务</param>
|
||||||
|
// public CreateToBeIssueTaskWorker(ILogger<CreateToBeIssueTaskWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
|
||||||
|
// {
|
||||||
|
// _logger = logger;
|
||||||
|
// RecurringJobId = nameof(CreateToBeIssueTaskWorker);
|
||||||
|
// CronExpression = "0 0/1 * * * *";
|
||||||
|
// TimeZone = TimeZoneInfo.Local;
|
||||||
|
// this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||||
|
// {
|
||||||
|
// _logger.LogError($"{DateTime.Now}");
|
||||||
|
// // await _scheduledMeterReadingService.CreateToBeIssueTasks();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
@ -0,0 +1,39 @@
|
|||||||
|
//using JiShe.CollectBus.ScheduledMeterReading;
|
||||||
|
//using Microsoft.Extensions.Logging;
|
||||||
|
//using System;
|
||||||
|
//using System.Collections.Generic;
|
||||||
|
//using System.Linq;
|
||||||
|
//using System.Text;
|
||||||
|
//using System.Threading;
|
||||||
|
//using System.Threading.Tasks;
|
||||||
|
//using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
|
//using Volo.Abp.DependencyInjection;
|
||||||
|
|
||||||
|
//namespace JiShe.CollectBus.Workers
|
||||||
|
//{
|
||||||
|
// /// <summary>
|
||||||
|
// /// 定时数据检测1小时一次
|
||||||
|
// /// </summary>
|
||||||
|
// public class DataDetectionFifteenMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker
|
||||||
|
// {
|
||||||
|
|
||||||
|
// private readonly ILogger<CreateToBeIssueTaskWorker> _logger;
|
||||||
|
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||||
|
|
||||||
|
// public DataDetectionFifteenMinuteWorker(ILogger<CreateToBeIssueTaskWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
|
||||||
|
// {
|
||||||
|
// _logger = logger;
|
||||||
|
// RecurringJobId = nameof(CreateToBeIssueTaskWorker);
|
||||||
|
// CronExpression = "0 0 0/1 * * ?";
|
||||||
|
// TimeZone = TimeZoneInfo.Local;
|
||||||
|
// this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
// public override Task DoWorkAsync(CancellationToken cancellationToken = default)
|
||||||
|
// {
|
||||||
|
// //throw new NotImplementedException();
|
||||||
|
// return Task.CompletedTask;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
@ -0,0 +1,39 @@
|
|||||||
|
//using System;
|
||||||
|
//using System.Threading;
|
||||||
|
//using System.Threading.Tasks;
|
||||||
|
//using Hangfire;
|
||||||
|
//using JiShe.CollectBus.Common.Attributes;
|
||||||
|
//using Microsoft.Extensions.Logging;
|
||||||
|
//using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
|
//using Volo.Abp.DependencyInjection;
|
||||||
|
//using Volo.Abp.Uow;
|
||||||
|
|
||||||
|
//namespace JiShe.CollectBus.Workers
|
||||||
|
//{
|
||||||
|
// public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
|
||||||
|
// {
|
||||||
|
// private readonly ILogger<EpiCollectWorker> _logger;
|
||||||
|
|
||||||
|
// /// <summary>
|
||||||
|
// /// Initializes a new instance of the <see cref="EpiCollectWorker"/> class.
|
||||||
|
// /// </summary>
|
||||||
|
// /// <param name="logger">The logger.</param>
|
||||||
|
// public EpiCollectWorker(ILogger<EpiCollectWorker> logger)
|
||||||
|
// {
|
||||||
|
// _logger = logger;
|
||||||
|
// RecurringJobId = nameof(EpiCollectWorker);
|
||||||
|
// CronExpression = Cron.Daily();
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
// public override Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||||
|
// {
|
||||||
|
// using (var uow = LazyServiceProvider.LazyGetRequiredService<IUnitOfWorkManager>().Begin())
|
||||||
|
// {
|
||||||
|
// Logger.LogInformation("Executed MyLogWorker..!");
|
||||||
|
// return Task.CompletedTask;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
@ -0,0 +1,48 @@
|
|||||||
|
//using System;
|
||||||
|
//using System.Threading;
|
||||||
|
//using System.Threading.Tasks;
|
||||||
|
//using Hangfire;
|
||||||
|
//using JiShe.CollectBus.ScheduledMeterReading;
|
||||||
|
//using Microsoft.Extensions.Logging;
|
||||||
|
//using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
|
//using Volo.Abp.DependencyInjection;
|
||||||
|
//using Volo.Abp.Uow;
|
||||||
|
|
||||||
|
//namespace JiShe.CollectBus.Workers
|
||||||
|
//{
|
||||||
|
// /// <summary>
|
||||||
|
// /// 15分钟采集数据
|
||||||
|
// /// </summary>
|
||||||
|
// public class SubscriberFifteenMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency, ICollectWorker
|
||||||
|
// {
|
||||||
|
// private readonly ILogger<SubscriberFifteenMinuteWorker> _logger;
|
||||||
|
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||||
|
|
||||||
|
// /// <summary>
|
||||||
|
// /// Initializes a new instance of the <see cref="SubscriberFifteenMinuteWorker"/> class.
|
||||||
|
// /// </summary>
|
||||||
|
// /// <param name="logger">The logger.</param>
|
||||||
|
// /// <param name="scheduledMeterReadingService">定时任务</param>
|
||||||
|
// public SubscriberFifteenMinuteWorker(ILogger<SubscriberFifteenMinuteWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
|
||||||
|
// {
|
||||||
|
// _logger = logger;
|
||||||
|
// RecurringJobId = nameof(SubscriberFifteenMinuteWorker);
|
||||||
|
// CronExpression = "0 0/15 * * * *";
|
||||||
|
// TimeZone = TimeZoneInfo.Local;
|
||||||
|
// this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||||
|
// {
|
||||||
|
// //await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading();
|
||||||
|
// //await _scheduledMeterReadingService.WatermeterScheduledMeterFifteenMinuteReading();
|
||||||
|
|
||||||
|
// //using (var uow = LazyServiceProvider.LazyGetRequiredService<IUnitOfWorkManager>().Begin())
|
||||||
|
// //{
|
||||||
|
// // Logger.LogInformation("Executed MyLogWorker..!");
|
||||||
|
// // return Task.CompletedTask;
|
||||||
|
// //}
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
@ -0,0 +1,42 @@
|
|||||||
|
//using System;
|
||||||
|
//using System.Threading;
|
||||||
|
//using System.Threading.Tasks;
|
||||||
|
//using Hangfire;
|
||||||
|
//using JiShe.CollectBus.ScheduledMeterReading;
|
||||||
|
//using Microsoft.Extensions.Logging;
|
||||||
|
//using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
|
//using Volo.Abp.DependencyInjection;
|
||||||
|
//using Volo.Abp.Uow;
|
||||||
|
|
||||||
|
//namespace JiShe.CollectBus.Workers
|
||||||
|
//{
|
||||||
|
// /// <summary>
|
||||||
|
// /// 5分钟采集数据
|
||||||
|
// /// </summary>
|
||||||
|
// public class SubscriberFiveMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
|
||||||
|
// {
|
||||||
|
// private readonly ILogger<SubscriberFiveMinuteWorker> _logger;
|
||||||
|
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||||
|
|
||||||
|
// /// <summary>
|
||||||
|
// /// Initializes a new instance of the <see cref="SubscriberFiveMinuteWorker"/> class.
|
||||||
|
// /// </summary>
|
||||||
|
// /// <param name="logger">The logger.</param>
|
||||||
|
// /// <param name="scheduledMeterReadingService">定时任务</param>
|
||||||
|
// public SubscriberFiveMinuteWorker(ILogger<SubscriberFiveMinuteWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
|
||||||
|
// {
|
||||||
|
// _logger = logger;
|
||||||
|
// RecurringJobId = nameof(SubscriberFiveMinuteWorker);
|
||||||
|
// CronExpression = "0 0/5 * * * *";
|
||||||
|
// TimeZone = TimeZoneInfo.Local;
|
||||||
|
// this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||||
|
// {
|
||||||
|
// //await _scheduledMeterReadingService.AmmeterScheduledMeterFiveMinuteReading();
|
||||||
|
// //await _scheduledMeterReadingService.WatermeterScheduledMeterFiveMinuteReading();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
@ -0,0 +1,44 @@
|
|||||||
|
//using System;
|
||||||
|
//using System.Threading;
|
||||||
|
//using System.Threading.Tasks;
|
||||||
|
//using Hangfire;
|
||||||
|
//using JiShe.CollectBus.ScheduledMeterReading;
|
||||||
|
//using Microsoft.Extensions.Logging;
|
||||||
|
//using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
|
//using Volo.Abp.DependencyInjection;
|
||||||
|
//using Volo.Abp.Uow;
|
||||||
|
|
||||||
|
//namespace JiShe.CollectBus.Workers
|
||||||
|
//{
|
||||||
|
// /// <summary>
|
||||||
|
// /// 1分钟采集数据
|
||||||
|
// /// </summary>
|
||||||
|
// public class SubscriberOneMinuteWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
|
||||||
|
// {
|
||||||
|
// private readonly ILogger<SubscriberOneMinuteWorker> _logger;
|
||||||
|
// private readonly IScheduledMeterReadingService _scheduledMeterReadingService;
|
||||||
|
|
||||||
|
// /// <summary>
|
||||||
|
// /// Initializes a new instance of the <see cref="SubscriberOneMinuteWorker"/> class.
|
||||||
|
// /// </summary>
|
||||||
|
// /// <param name="logger">The logger.</param>
|
||||||
|
// /// <param name="scheduledMeterReadingService">定时任务</param>
|
||||||
|
// public SubscriberOneMinuteWorker(ILogger<SubscriberOneMinuteWorker> logger, IScheduledMeterReadingService scheduledMeterReadingService)
|
||||||
|
// {
|
||||||
|
// _logger = logger;
|
||||||
|
// RecurringJobId = nameof(SubscriberOneMinuteWorker);
|
||||||
|
// CronExpression = "0 0/1 * * * *";
|
||||||
|
// TimeZone = TimeZoneInfo.Local;
|
||||||
|
// this._scheduledMeterReadingService = scheduledMeterReadingService;
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
// public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||||
|
// {
|
||||||
|
// //await _scheduledMeterReadingService.AmmeterScheduledMeterOneMinuteReading();
|
||||||
|
|
||||||
|
// //await _scheduledMeterReadingService.WatermeterScheduledMeterOneMinuteReading();
|
||||||
|
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
using JiShe.CollectBus.IoTDB;
|
using JiShe.CollectBus.MongoDB;
|
||||||
using Volo.Abp.Autofac;
|
using Volo.Abp.Autofac;
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
|
|
||||||
@ -6,7 +6,7 @@ namespace JiShe.CollectBus.DbMigrator;
|
|||||||
|
|
||||||
[DependsOn(
|
[DependsOn(
|
||||||
typeof(AbpAutofacModule),
|
typeof(AbpAutofacModule),
|
||||||
typeof(CollectBusIoTDbModule),
|
typeof(CollectBusMongoDbModule),
|
||||||
typeof(CollectBusApplicationContractsModule)
|
typeof(CollectBusApplicationContractsModule)
|
||||||
)]
|
)]
|
||||||
public class CollectBusDbMigratorModule : AbpModule
|
public class CollectBusDbMigratorModule : AbpModule
|
||||||
|
|||||||
@ -7,7 +7,6 @@ using JiShe.CollectBus.Data;
|
|||||||
using Serilog;
|
using Serilog;
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
using Volo.Abp.Data;
|
using Volo.Abp.Data;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.DbMigrator;
|
namespace JiShe.CollectBus.DbMigrator;
|
||||||
|
|
||||||
@ -34,16 +33,10 @@ public class DbMigratorHostedService : IHostedService
|
|||||||
{
|
{
|
||||||
await application.InitializeAsync();
|
await application.InitializeAsync();
|
||||||
|
|
||||||
//await application
|
|
||||||
// .ServiceProvider
|
|
||||||
// .GetRequiredService<CollectBusDbMigrationService>()
|
|
||||||
// .MigrateAsync();
|
|
||||||
|
|
||||||
//初始化IoTDB表模型
|
|
||||||
await application
|
await application
|
||||||
.ServiceProvider
|
.ServiceProvider
|
||||||
.GetRequiredService<IIoTDbProvider>().GetSessionPool(true)
|
.GetRequiredService<CollectBusDbMigrationService>()
|
||||||
.InitTableSessionModelAsync();
|
.MigrateAsync();
|
||||||
|
|
||||||
await application.ShutdownAsync();
|
await application.ShutdownAsync();
|
||||||
|
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
||||||
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
|||||||
@ -1,14 +1,5 @@
|
|||||||
{
|
{
|
||||||
"ConnectionStrings": {
|
"ConnectionStrings": {
|
||||||
"Default": "mongodb://admin:collectbus_mongodb_jishe@118.190.144.92:37017/JiSheCollectBus?authSource=admin"
|
"Default": "mongodb://admin:collectbus_mongodb_jishe@118.190.144.92:37017/JiSheCollectBus?authSource=admin"
|
||||||
},
|
|
||||||
"IoTDBOptions": {
|
|
||||||
"UserName": "root",
|
|
||||||
"Password": "root",
|
|
||||||
"ClusterList": [ "192.168.5.9:6667" ],
|
|
||||||
"PoolSize": 32,
|
|
||||||
"DataBaseName": "energy",
|
|
||||||
"OpenDebugMode": true,
|
|
||||||
"UseTableSessionPoolByDefault": false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,17 +17,13 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0" />
|
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0" />
|
||||||
<PackageReference Include="Mapster" Version="7.4.0" />
|
<PackageReference Include="Mapster" Version="7.4.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.1" />
|
|
||||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
|
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
|
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
|
||||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
|
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" />
|
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.1" />
|
|
||||||
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
||||||
<PackageReference Include="Serilog" Version="4.1.0" />
|
<PackageReference Include="Serilog" Version="4.1.0" />
|
||||||
<PackageReference Include="System.Linq.Dynamic.Core" Version="1.6.4" />
|
|
||||||
<PackageReference Include="System.Text.Json" Version="8.0.5" />
|
|
||||||
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|||||||
@ -1,82 +0,0 @@
|
|||||||
using Microsoft.Extensions.Hosting;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.WorkService
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 系统后台定时服务
|
|
||||||
/// </summary>
|
|
||||||
public abstract class SystemBackGroundWorkService : BackgroundService
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 日志记录
|
|
||||||
/// </summary>
|
|
||||||
public ILogger<SystemBackGroundWorkService> Logger { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 创建一个取消标记源
|
|
||||||
/// </summary>
|
|
||||||
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 任务执行间隔时间
|
|
||||||
/// </summary>
|
|
||||||
private TimeSpan interval;
|
|
||||||
|
|
||||||
protected SystemBackGroundWorkService(ILogger<SystemBackGroundWorkService> logger)
|
|
||||||
{
|
|
||||||
Logger = logger;
|
|
||||||
interval = GetInterval();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 执行时间间隔
|
|
||||||
/// </summary>
|
|
||||||
/// <returns></returns>
|
|
||||||
protected abstract TimeSpan GetInterval();
|
|
||||||
|
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
||||||
{
|
|
||||||
await Task.CompletedTask;// 等待其他任务执行完成,避免阻塞应用程序启动
|
|
||||||
Logger.LogInformation($"任务每隔{interval.TotalSeconds}秒执行一次");
|
|
||||||
await InitAsync(cancellationTokenSource.Token);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected virtual async Task InitAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
while (!cancellationToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await DoWorkAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
Logger.LogError(ex, "后台任务执行发生异常");
|
|
||||||
}
|
|
||||||
|
|
||||||
await Task.Delay(interval, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task StopAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
Logger.LogInformation("后台服务停止……");
|
|
||||||
cancellationTokenSource.Cancel();
|
|
||||||
return base.StopAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 抛出方法入口以便于其服务实现
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="cancellationToken"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
protected abstract Task DoWorkAsync(CancellationToken cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -19,6 +19,7 @@ using TouchSocket.Sockets;
|
|||||||
using JiShe.CollectBus.Plugins;
|
using JiShe.CollectBus.Plugins;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||||
|
using JiShe.CollectBus.Cassandra;
|
||||||
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Host
|
namespace JiShe.CollectBus.Host
|
||||||
@ -38,18 +39,18 @@ namespace JiShe.CollectBus.Host
|
|||||||
|
|
||||||
Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = false; });
|
Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = false; });
|
||||||
|
|
||||||
//context.Services.AddHangfire(config =>
|
context.Services.AddHangfire(config =>
|
||||||
//{
|
{
|
||||||
// config.UseRedisStorage(
|
config.UseRedisStorage(
|
||||||
// context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
|
context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
|
||||||
// .WithJobExpirationTimeout(TimeSpan.FromDays(7));
|
.WithJobExpirationTimeout(TimeSpan.FromDays(7));
|
||||||
// var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
|
var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
|
||||||
// const int Attempts = 3; // 重试次数
|
const int Attempts = 3; // 重试次数
|
||||||
// config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
|
config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
|
||||||
// //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
|
//config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
|
||||||
// config.UseFilter(new JobRetryLastFilter(Attempts));
|
config.UseFilter(new JobRetryLastFilter(Attempts));
|
||||||
//});
|
});
|
||||||
//context.Services.AddHangfireServer();
|
context.Services.AddHangfireServer();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -193,9 +194,9 @@ namespace JiShe.CollectBus.Host
|
|||||||
options =>
|
options =>
|
||||||
{
|
{
|
||||||
options.IgnoredUrls.Add("/AuditLogs/page");
|
options.IgnoredUrls.Add("/AuditLogs/page");
|
||||||
//options.IgnoredUrls.Add("/hangfire/stats");
|
options.IgnoredUrls.Add("/hangfire/stats");
|
||||||
options.IgnoredUrls.Add("/hangfire/recurring/trigger");
|
options.IgnoredUrls.Add("/hangfire/recurring/trigger");
|
||||||
//options.IgnoredUrls.Add("/cap");
|
options.IgnoredUrls.Add("/cap");
|
||||||
options.IgnoredUrls.Add("/");
|
options.IgnoredUrls.Add("/");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -251,9 +252,11 @@ namespace JiShe.CollectBus.Host
|
|||||||
/// <param name="configuration"></param>
|
/// <param name="configuration"></param>
|
||||||
private void ConfigureHealthChecks(ServiceConfigurationContext context, IConfiguration configuration)
|
private void ConfigureHealthChecks(ServiceConfigurationContext context, IConfiguration configuration)
|
||||||
{
|
{
|
||||||
if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return;
|
if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return;
|
||||||
|
var cassandraConfig = new CassandraConfig();
|
||||||
|
configuration.GetSection("Cassandra").Bind(cassandraConfig);
|
||||||
context.Services.AddHealthChecks()
|
context.Services.AddHealthChecks()
|
||||||
//.AddMongoDb(configuration.GetConnectionString("Default"), "MongoDB", HealthStatus.Unhealthy)
|
.AddMongoDb(configuration.GetConnectionString("Default"), "MongoDB", HealthStatus.Unhealthy)
|
||||||
.AddRedis(configuration.GetValue<string>("Redis:Configuration") ?? string.Empty, "Redis",
|
.AddRedis(configuration.GetValue<string>("Redis:Configuration") ?? string.Empty, "Redis",
|
||||||
HealthStatus.Unhealthy)
|
HealthStatus.Unhealthy)
|
||||||
//.AddKafka(new Confluent.Kafka.ProducerConfig
|
//.AddKafka(new Confluent.Kafka.ProducerConfig
|
||||||
|
|||||||
@ -5,12 +5,14 @@ using JiShe.CollectBus.Host.Extensions;
|
|||||||
using JiShe.CollectBus.Host.HealthChecks;
|
using JiShe.CollectBus.Host.HealthChecks;
|
||||||
using JiShe.CollectBus.Host.Swaggers;
|
using JiShe.CollectBus.Host.Swaggers;
|
||||||
using JiShe.CollectBus.IoTDB.Options;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
|
using JiShe.CollectBus.MongoDB;
|
||||||
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
|
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
|
||||||
using Swashbuckle.AspNetCore.SwaggerUI;
|
using Swashbuckle.AspNetCore.SwaggerUI;
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
using Volo.Abp.AspNetCore.Authentication.JwtBearer;
|
using Volo.Abp.AspNetCore.Authentication.JwtBearer;
|
||||||
using Volo.Abp.AspNetCore.Serilog;
|
using Volo.Abp.AspNetCore.Serilog;
|
||||||
using Volo.Abp.Autofac;
|
using Volo.Abp.Autofac;
|
||||||
|
using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||||
using Volo.Abp.Caching.StackExchangeRedis;
|
using Volo.Abp.Caching.StackExchangeRedis;
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
using Volo.Abp.Swashbuckle;
|
using Volo.Abp.Swashbuckle;
|
||||||
@ -28,7 +30,9 @@ namespace JiShe.CollectBus.Host
|
|||||||
typeof(AbpSwashbuckleModule),
|
typeof(AbpSwashbuckleModule),
|
||||||
typeof(AbpTimingModule),
|
typeof(AbpTimingModule),
|
||||||
typeof(CollectBusApplicationModule),
|
typeof(CollectBusApplicationModule),
|
||||||
typeof(AbpCachingStackExchangeRedisModule)
|
typeof(CollectBusMongoDbModule),
|
||||||
|
typeof(AbpCachingStackExchangeRedisModule),
|
||||||
|
typeof(AbpBackgroundWorkersHangfireModule)
|
||||||
)]
|
)]
|
||||||
public partial class CollectBusHostModule : AbpModule
|
public partial class CollectBusHostModule : AbpModule
|
||||||
{
|
{
|
||||||
@ -43,8 +47,8 @@ namespace JiShe.CollectBus.Host
|
|||||||
ConfigureSwaggerServices(context, configuration);
|
ConfigureSwaggerServices(context, configuration);
|
||||||
ConfigureNetwork(context, configuration);
|
ConfigureNetwork(context, configuration);
|
||||||
ConfigureJwtAuthentication(context, configuration);
|
ConfigureJwtAuthentication(context, configuration);
|
||||||
//ConfigureHangfire(context);
|
ConfigureHangfire(context);
|
||||||
//ConfigureAuditLog(context);
|
ConfigureAuditLog(context);
|
||||||
ConfigureCustom(context, configuration);
|
ConfigureCustom(context, configuration);
|
||||||
ConfigureHealthChecks(context, configuration);
|
ConfigureHealthChecks(context, configuration);
|
||||||
Configure<AbpClockOptions>(options => { options.Kind = DateTimeKind.Local; });
|
Configure<AbpClockOptions>(options => { options.Kind = DateTimeKind.Local; });
|
||||||
@ -88,10 +92,10 @@ namespace JiShe.CollectBus.Host
|
|||||||
app.UseAuditing();
|
app.UseAuditing();
|
||||||
app.UseAbpSerilogEnrichers();
|
app.UseAbpSerilogEnrichers();
|
||||||
app.UseUnitOfWork();
|
app.UseUnitOfWork();
|
||||||
//app.UseHangfireDashboard("/hangfire", new DashboardOptions
|
app.UseHangfireDashboard("/hangfire", new DashboardOptions
|
||||||
//{
|
{
|
||||||
// IgnoreAntiforgeryToken = true
|
IgnoreAntiforgeryToken = true
|
||||||
//});
|
});
|
||||||
app.UseConfiguredEndpoints(endpoints =>
|
app.UseConfiguredEndpoints(endpoints =>
|
||||||
{
|
{
|
||||||
if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return;
|
if (!configuration.GetValue<bool>("HealthChecks:IsEnable")) return;
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
//using Cassandra;
|
using Cassandra;
|
||||||
//using JiShe.CollectBus.Cassandra;
|
using JiShe.CollectBus.Cassandra;
|
||||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Host.HealthChecks
|
namespace JiShe.CollectBus.Host.HealthChecks
|
||||||
@ -31,28 +31,27 @@ namespace JiShe.CollectBus.Host.HealthChecks
|
|||||||
/// </returns>
|
/// </returns>
|
||||||
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
|
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
return HealthCheckResult.Healthy("Cassandra is unhealthy.");
|
var cassandraConfig = new CassandraConfig();
|
||||||
//var cassandraConfig = new CassandraConfig();
|
_configuration.GetSection("Cassandra").Bind(cassandraConfig);
|
||||||
//_configuration.GetSection("Cassandra").Bind(cassandraConfig);
|
try
|
||||||
//try
|
{
|
||||||
//{
|
var clusterBuilder = Cluster.Builder();
|
||||||
// var clusterBuilder = Cluster.Builder();
|
foreach (var node in cassandraConfig.Nodes)
|
||||||
// foreach (var node in cassandraConfig.Nodes)
|
{
|
||||||
// {
|
clusterBuilder.AddContactPoint(node.Host)
|
||||||
// clusterBuilder.AddContactPoint(node.Host)
|
.WithPort(node.Port);
|
||||||
// .WithPort(node.Port);
|
}
|
||||||
// }
|
clusterBuilder.WithCredentials(cassandraConfig.Username, cassandraConfig.Password);
|
||||||
// clusterBuilder.WithCredentials(cassandraConfig.Username, cassandraConfig.Password);
|
var cluster = clusterBuilder.Build();
|
||||||
// var cluster = clusterBuilder.Build();
|
using var session = await cluster.ConnectAsync();
|
||||||
// using var session = await cluster.ConnectAsync();
|
var result = await Task.FromResult(session.Execute("SELECT release_version FROM system.local"));
|
||||||
// var result = await Task.FromResult(session.Execute("SELECT release_version FROM system.local"));
|
var version = result.First().GetValue<string>("release_version");
|
||||||
// var version = result.First().GetValue<string>("release_version");
|
return HealthCheckResult.Healthy($"Cassandra is healthy. Version: {version}");
|
||||||
// return HealthCheckResult.Healthy($"Cassandra is healthy. Version: {version}");
|
}
|
||||||
//}
|
catch (Exception ex)
|
||||||
//catch (Exception ex)
|
{
|
||||||
//{
|
return new HealthCheckResult(context.Registration.FailureStatus, $"Cassandra is unhealthy: {ex.Message}", ex);
|
||||||
// return new HealthCheckResult(context.Registration.FailureStatus, $"Cassandra is unhealthy: {ex.Message}", ex);
|
}
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1,4 +1,5 @@
|
|||||||
using System.Net.Sockets;
|
using System.Net.Sockets;
|
||||||
|
using JiShe.CollectBus.Cassandra;
|
||||||
using JiShe.CollectBus.IoTDB.Interface;
|
using JiShe.CollectBus.IoTDB.Interface;
|
||||||
using JiShe.CollectBus.IoTDB.Options;
|
using JiShe.CollectBus.IoTDB.Options;
|
||||||
using JiShe.CollectBus.IoTDB.Provider;
|
using JiShe.CollectBus.IoTDB.Provider;
|
||||||
|
|||||||
@ -25,6 +25,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
|
||||||
|
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
|
||||||
@ -46,16 +47,21 @@
|
|||||||
<PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
||||||
|
<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />
|
||||||
|
|
||||||
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
|
||||||
|
<!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" />
|
||||||
|
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />
|
||||||
|
<PackageReference Include="Hangfire.Dashboard.BasicAuthorization" Version="1.0.2" />-->
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Page Include="Plugins\ignore.txt" />
|
<Page Include="Plugins\ignore.txt" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||||
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||||
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Application\JiShe.CollectBus.Application.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.HttpApi\JiShe.CollectBus.HttpApi.csproj" />
|
||||||
|
|||||||
@ -140,7 +140,7 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"ServerApplicationOptions": {
|
"ServerApplicationOptions": {
|
||||||
"ServerTagName": "JiSheCollectBus77",
|
"ServerTagName": "JiSheCollectBus99",
|
||||||
"SystemType": "Energy",
|
"SystemType": "Energy",
|
||||||
"FirstCollectionTime": "2025-04-28 15:07:00",
|
"FirstCollectionTime": "2025-04-28 15:07:00",
|
||||||
"AutomaticVerificationTime": "16:07:00",
|
"AutomaticVerificationTime": "16:07:00",
|
||||||
@ -153,8 +153,5 @@
|
|||||||
"PlugInFolder": "",
|
"PlugInFolder": "",
|
||||||
"TCP": {
|
"TCP": {
|
||||||
"ClientPort": 10500
|
"ClientPort": 10500
|
||||||
},
|
|
||||||
"BackgroundJobs": {
|
|
||||||
"IsJobExecutionEnabled": false // 关闭任务执行
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2,6 +2,7 @@
|
|||||||
using JiShe.CollectBus.Common;
|
using JiShe.CollectBus.Common;
|
||||||
using JiShe.CollectBus.Migration.Host.HealthChecks;
|
using JiShe.CollectBus.Migration.Host.HealthChecks;
|
||||||
using JiShe.CollectBus.Migration.Host.Swaggers;
|
using JiShe.CollectBus.Migration.Host.Swaggers;
|
||||||
|
using JiShe.CollectBus.MongoDB;
|
||||||
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
|
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
|
||||||
using Swashbuckle.AspNetCore.SwaggerUI;
|
using Swashbuckle.AspNetCore.SwaggerUI;
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
@ -26,6 +27,7 @@ namespace JiShe.CollectBus.Migration.Host
|
|||||||
typeof(AbpAspNetCoreSerilogModule),
|
typeof(AbpAspNetCoreSerilogModule),
|
||||||
typeof(AbpSwashbuckleModule),
|
typeof(AbpSwashbuckleModule),
|
||||||
typeof(AbpTimingModule),
|
typeof(AbpTimingModule),
|
||||||
|
typeof(CollectBusMongoDbModule),
|
||||||
typeof(CollectBusMigrationApplicationModule),
|
typeof(CollectBusMigrationApplicationModule),
|
||||||
typeof(AbpCachingStackExchangeRedisModule)
|
typeof(AbpCachingStackExchangeRedisModule)
|
||||||
)]
|
)]
|
||||||
|
|||||||
@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="9.0.0" />
|
||||||
|
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.UI" Version="9.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
|
||||||
@ -42,14 +43,19 @@
|
|||||||
<PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AspNetCore.Authentication.JwtBearer" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Autofac" Version="8.3.3" />
|
||||||
|
<!--<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />-->
|
||||||
|
|
||||||
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
|
||||||
|
|
||||||
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
|
||||||
|
<!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" />
|
||||||
|
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />
|
||||||
|
<PackageReference Include="Hangfire.Dashboard.BasicAuthorization" Version="1.0.2" />-->
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\..\modules\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||||
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application.Contracts\JiShe.CollectBus.Migration.Application.Contracts.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application.Contracts\JiShe.CollectBus.Migration.Application.Contracts.csproj" />
|
||||||
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application\JiShe.CollectBus.Migration.Application.csproj" />
|
<ProjectReference Include="..\..\services\JiShe.CollectBus.Migration.Application\JiShe.CollectBus.Migration.Application.csproj" />
|
||||||
<ProjectReference Include="..\JiShe.CollectBus.Migration.HttpApi\JiShe.CollectBus.Migration.HttpApi.csproj" />
|
<ProjectReference Include="..\JiShe.CollectBus.Migration.HttpApi\JiShe.CollectBus.Migration.HttpApi.csproj" />
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user