This commit is contained in:
ChenYi 2025-05-21 17:11:32 +08:00
commit 71a5f7b89b
7 changed files with 87 additions and 72 deletions

View File

@ -1,4 +1,5 @@
using FreeRedis; using Confluent.Kafka;
using FreeRedis;
using JiShe.CollectBus.Common; using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Encrypt; using JiShe.CollectBus.Common.Encrypt;
@ -29,6 +30,7 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids; using Volo.Abp.Guids;
using YamlDotNet.Core.Tokens; using YamlDotNet.Core.Tokens;
using static FreeSql.Internal.GlobalFilter; using static FreeSql.Internal.GlobalFilter;
using static IClientRPCService;
using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst; using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst;
namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
@ -211,6 +213,31 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(data.FiledName)) if (!string.IsNullOrWhiteSpace(data.FiledName))
{ {
await _dbProvider.GetSessionPool(false).InsertAsync(meter); await _dbProvider.GetSessionPool(false).InsertAsync(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{data.DeviceId}",
DeviceType = $"{data.DeviceType}",
ProjectId = $"{data.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID)
};
await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID);
} }
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@ -228,6 +255,9 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
var data = analysisBaseDto.Data!; var data = analysisBaseDto.Data!;
List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> meterReadingTelemetryPacketInfos = new List<MeterReadingTelemetryPacketInfo>();
List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>(); List<TreeModelSingleMeasuringEntity<T>> treeModelSingleMeasuringEntities = new List<TreeModelSingleMeasuringEntity<T>>();
List<TreeModelSingleMeasuringEntity<bool>> meterIsSyncs = new List<TreeModelSingleMeasuringEntity<bool>>();
List<TreeModelSingleMeasuringEntity<int>> meterDataBaseIDs = new List<TreeModelSingleMeasuringEntity<int>>();
foreach (var item in data) foreach (var item in data)
{ {
if(!item.TimeSpan.HasValue) if(!item.TimeSpan.HasValue)
@ -320,6 +350,35 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0) if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0)
{ {
treeModelSingleMeasuringEntities.Add(meter); treeModelSingleMeasuringEntities.Add(meter);
// 增加标识字段
var meterIsSync = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.IsSync, false)
};
if(!meterIsSyncs.Any(a=> a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps))
meterIsSyncs.Add(meterIsSync);
// 数据库业务ID
var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity<int>()
{
SystemName = _applicationOptions.SystemType,
DeviceId = $"{item.DeviceId}",
DeviceType = $"{item.DeviceType}",
ProjectId = $"{item.ProjectId}",
DataType = analysisBaseDto.DataType,
Timestamps = meter.Timestamps,
SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID)
};
if (!meterDataBaseIDs.Any(a => a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps))
meterDataBaseIDs.Add(meterIsDatabaseBusiID);
} }
} }
// 批量保存数据 // 批量保存数据
@ -327,6 +386,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (treeModelSingleMeasuringEntities.Count > 0) if (treeModelSingleMeasuringEntities.Count > 0)
{ {
await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities); await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities);
// 报存标识字段
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs);
await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs);
} }
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@ -371,7 +435,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
DataType = IOTDBDataTypeConst.Status, DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
}; };
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -384,7 +448,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{data.ProjectId}", ProjectId = $"{data.ProjectId}",
Timestamps = timestamps, Timestamps = timestamps,
DataType = IOTDBDataTypeConst.Status, DataType = IOTDBDataTypeConst.Status,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now) SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now)
}; };
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData); await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);
} }
@ -472,7 +536,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}", ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status, DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty)
}; };
await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData);
@ -485,7 +549,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
ProjectId = $"{item.ProjectId}", ProjectId = $"{item.ProjectId}",
DataType = IOTDBDataTypeConst.Status, DataType = IOTDBDataTypeConst.Status,
Timestamps = timestamps, Timestamps = timestamps,
SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now) SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now)
}; };
await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData); await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData);

View File

@ -14,7 +14,7 @@ using Volo.Abp.Modularity;
namespace JiShe.CollectBus.Migration; namespace JiShe.CollectBus.Migration;
[DependsOn( [DependsOn(
typeof(CollectBusDomainModule), //typeof(CollectBusDomainModule),
typeof(CollectBusMigrationApplicationContractsModule), typeof(CollectBusMigrationApplicationContractsModule),
typeof(AbpDddApplicationModule), typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule), typeof(AbpAutoMapperModule),

View File

@ -170,9 +170,9 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary> /// <summary>
/// 集中器状态字段 /// IotDB存储字段字段
/// </summary> /// </summary>
public class ConcentratorStatusFieldConst public class IotDbFieldConst
{ {
/// <summary> /// <summary>
@ -185,6 +185,16 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary> /// </summary>
public const string FrameData = "FrameData"; public const string FrameData = "FrameData";
/// <summary>
/// 是否同步
/// </summary>
public const string IsSync = "IsSync";
/// <summary>
/// 数据库业务ID
/// </summary>
public const string DatabaseBusiID= "DatabaseBusiID";
} }
#endregion #endregion

View File

@ -1,7 +1,4 @@
//using Hangfire; using JiShe.CollectBus.Migration.Host.HealthChecks;
//using Hangfire.Redis.StackExchange;
using JiShe.CollectBus.Migration.Host.Hangfire;
using JiShe.CollectBus.Migration.Host.HealthChecks;
using JiShe.CollectBus.Migration.Host.Swaggers; using JiShe.CollectBus.Migration.Host.Swaggers;
using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.DataProtection; using Microsoft.AspNetCore.DataProtection;
@ -12,7 +9,6 @@ using StackExchange.Redis;
using System.Text; using System.Text;
using Volo.Abp.AspNetCore.Auditing; using Volo.Abp.AspNetCore.Auditing;
using Volo.Abp.Auditing; using Volo.Abp.Auditing;
using Volo.Abp.BackgroundJobs;
using Volo.Abp.Caching; using Volo.Abp.Caching;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
@ -21,33 +17,6 @@ namespace JiShe.CollectBus.Migration.Host
{ {
public partial class CollectBusMigrationHostModule public partial class CollectBusMigrationHostModule
{ {
/// <summary>
/// Configures the hangfire.
/// </summary>
/// <param name="context">The context.</param>
//private void ConfigureHangfire(ServiceConfigurationContext context)
//{
// var redisStorageOptions = new RedisStorageOptions()
// {
// Db = context.Services.GetConfiguration().GetValue<int>("Redis:HangfireDB")
// };
// Configure<AbpBackgroundJobOptions>(options => { options.IsJobExecutionEnabled = false; });
// context.Services.AddHangfire(config =>
// {
// config.UseRedisStorage(
// context.Services.GetConfiguration().GetValue<string>("Redis:Configuration"), redisStorageOptions)
// .WithJobExpirationTimeout(TimeSpan.FromDays(7));
// var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔
// const int Attempts = 3; // 重试次数
// config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds });
// //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7)));
// config.UseFilter(new JobRetryLastFilter(Attempts));
// });
// context.Services.AddHangfireServer();
//}
/// <summary> /// <summary>
/// Configures the JWT authentication. /// Configures the JWT authentication.
/// </summary> /// </summary>

View File

@ -42,7 +42,6 @@ namespace JiShe.CollectBus.Migration.Host
ConfigureSwaggerServices(context, configuration); ConfigureSwaggerServices(context, configuration);
//ConfigureNetwork(context, configuration); //ConfigureNetwork(context, configuration);
ConfigureJwtAuthentication(context, configuration); ConfigureJwtAuthentication(context, configuration);
//ConfigureHangfire(context);
ConfigureAuditLog(context); ConfigureAuditLog(context);
ConfigureCustom(context, configuration); ConfigureCustom(context, configuration);
ConfigureHealthChecks(context, configuration); ConfigureHealthChecks(context, configuration);
@ -84,9 +83,9 @@ namespace JiShe.CollectBus.Migration.Host
options.DefaultModelsExpandDepth(-1); options.DefaultModelsExpandDepth(-1);
}); });
} }
//app.UseAuditing(); app.UseAuditing();
app.UseAbpSerilogEnrichers(); app.UseAbpSerilogEnrichers();
app.UseUnitOfWork(); //app.UseUnitOfWork();
//app.UseHangfireDashboard("/hangfire", new DashboardOptions //app.UseHangfireDashboard("/hangfire", new DashboardOptions
//{ //{
// IgnoreAntiforgeryToken = true // IgnoreAntiforgeryToken = true

View File

@ -1,29 +0,0 @@
using Hangfire.Common;
using Hangfire.States;
using Serilog;
namespace JiShe.CollectBus.Migration.Host.Hangfire
{
/// <summary>
/// 重试最后一次
/// </summary>
public class JobRetryLastFilter : JobFilterAttribute, IElectStateFilter
{
private int RetryCount { get; }
public JobRetryLastFilter(int retryCount)
{
RetryCount = retryCount;
}
public void OnStateElection(ElectStateContext context)
{
var retryAttempt = context.GetJobParameter<int>("RetryCount");
if (RetryCount == retryAttempt)
{
Log.Error("最后一次重试");
}
}
}
}

View File

@ -46,6 +46,8 @@
<!--<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />--> <!--<PackageReference Include="Volo.Abp.BackgroundWorkers.Hangfire" Version="8.3.3" />-->
<PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Caching.StackExchangeRedis" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Swashbuckle" Version="8.3.3" />
<!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" /> <!--<PackageReference Include="Hangfire.HttpJob" Version="3.8.5" />
<PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" /> <PackageReference Include="Hangfire.MySqlStorage" Version="2.0.3" />