diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs index a997274..ea11af8 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs @@ -1,4 +1,5 @@ -using FreeRedis; +using Confluent.Kafka; +using FreeRedis; using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Encrypt; @@ -29,6 +30,7 @@ using Volo.Abp.DependencyInjection; using Volo.Abp.Guids; using YamlDotNet.Core.Tokens; using static FreeSql.Internal.GlobalFilter; +using static IClientRPCService; using static JiShe.CollectBus.Common.Consts.T37612012PacketItemCodeConst; namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData @@ -211,6 +213,31 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData if (!string.IsNullOrWhiteSpace(data.FiledName)) { await _dbProvider.GetSessionPool(false).InsertAsync(meter); + // 增加标识字段 + var meterIsSync = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{data.DeviceId}", + DeviceType = $"{data.DeviceType}", + ProjectId = $"{data.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.IsSync, false) + }; + await _dbProvider.GetSessionPool(false).InsertAsync(meterIsSync); + + // 数据库业务ID + var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{data.DeviceId}", + DeviceType = $"{data.DeviceType}", + ProjectId = $"{data.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, data.DatabaseBusiID) + }; + await _dbProvider.GetSessionPool(false).InsertAsync(meterIsDatabaseBusiID); } return await Task.FromResult(true); } @@ -228,6 +255,9 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData var data = analysisBaseDto.Data!; List meterReadingTelemetryPacketInfos = new List(); List> treeModelSingleMeasuringEntities = new List>(); + List> meterIsSyncs = new List>(); + List> meterDataBaseIDs = new List>(); + foreach (var item in data) { if(!item.TimeSpan.HasValue) @@ -320,6 +350,35 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData if (!string.IsNullOrWhiteSpace(item.FiledName) && item.ProjectId>0) { treeModelSingleMeasuringEntities.Add(meter); + + // 增加标识字段 + var meterIsSync = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{item.DeviceId}", + DeviceType = $"{item.DeviceType}", + ProjectId = $"{item.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.IsSync, false) + }; + if(!meterIsSyncs.Any(a=> a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName== meterIsSync.SystemName && a.DeviceId== meterIsSync.DeviceId && a.Timestamps== meterIsSync.Timestamps)) + meterIsSyncs.Add(meterIsSync); + + // 数据库业务ID + var meterIsDatabaseBusiID = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{item.DeviceId}", + DeviceType = $"{item.DeviceType}", + ProjectId = $"{item.ProjectId}", + DataType = analysisBaseDto.DataType, + Timestamps = meter.Timestamps, + SingleMeasuring = (IotDbFieldConst.DatabaseBusiID, item.DatabaseBusiID) + }; + if (!meterDataBaseIDs.Any(a => a.DataType == meterIsSync.DataType && a.ProjectId == meterIsSync.ProjectId && a.SystemName == meterIsSync.SystemName && a.DeviceId == meterIsSync.DeviceId && a.Timestamps == meterIsSync.Timestamps)) + meterDataBaseIDs.Add(meterIsDatabaseBusiID); + } } // 批量保存数据 @@ -327,6 +386,11 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData if (treeModelSingleMeasuringEntities.Count > 0) { await _dbProvider.GetSessionPool(false).BatchInsertAsync(treeModelSingleMeasuringEntities); + + // 报存标识字段 + await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterIsSyncs); + await _dbProvider.GetSessionPool(false).BatchInsertAsync(meterDataBaseIDs); + } return await Task.FromResult(true); } @@ -371,7 +435,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{data.ProjectId}", DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, - SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) + SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); @@ -384,7 +448,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{data.ProjectId}", Timestamps = timestamps, DataType = IOTDBDataTypeConst.Status, - SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now) + SingleMeasuring = (IotDbFieldConst.RecordingTime, data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData); } @@ -472,7 +536,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{item.ProjectId}", DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, - SingleMeasuring = (ConcentratorStatusFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) + SingleMeasuring = (IotDbFieldConst.FrameData, analysisBaseDto.ReceivedHexMessage ?? string.Empty) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeFrameData); @@ -485,7 +549,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData ProjectId = $"{item.ProjectId}", DataType = IOTDBDataTypeConst.Status, Timestamps = timestamps, - SingleMeasuring = (ConcentratorStatusFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now) + SingleMeasuring = (IotDbFieldConst.RecordingTime, item.TimeSpan.HasValue ? item.TimeSpan.Value : DateTime.Now) }; await _dbProvider.GetSessionPool(false).InsertAsync(treeRecordingTimeData); diff --git a/services/JiShe.CollectBus.Migration.Application/CollectBusMigrationApplicationModule.cs b/services/JiShe.CollectBus.Migration.Application/CollectBusMigrationApplicationModule.cs index 8a82b7e..3a89171 100644 --- a/services/JiShe.CollectBus.Migration.Application/CollectBusMigrationApplicationModule.cs +++ b/services/JiShe.CollectBus.Migration.Application/CollectBusMigrationApplicationModule.cs @@ -14,7 +14,7 @@ using Volo.Abp.Modularity; namespace JiShe.CollectBus.Migration; [DependsOn( - typeof(CollectBusDomainModule), + //typeof(CollectBusDomainModule), typeof(CollectBusMigrationApplicationContractsModule), typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), diff --git a/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs b/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs index 69e33e5..b556524 100644 --- a/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/T37612012PacketItemCodeConst.cs @@ -170,9 +170,9 @@ namespace JiShe.CollectBus.Common.Consts /// - /// 集中器状态字段 + /// IotDB存储字段字段 /// - public class ConcentratorStatusFieldConst + public class IotDbFieldConst { /// @@ -185,6 +185,16 @@ namespace JiShe.CollectBus.Common.Consts /// public const string FrameData = "FrameData"; + /// + /// 是否同步 + /// + public const string IsSync = "IsSync"; + + /// + /// 数据库业务ID + /// + public const string DatabaseBusiID= "DatabaseBusiID"; + } #endregion diff --git a/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.Configure.cs b/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.Configure.cs index 33c2b3a..ebe0541 100644 --- a/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.Configure.cs +++ b/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.Configure.cs @@ -1,7 +1,4 @@ -//using Hangfire; -//using Hangfire.Redis.StackExchange; -using JiShe.CollectBus.Migration.Host.Hangfire; -using JiShe.CollectBus.Migration.Host.HealthChecks; +using JiShe.CollectBus.Migration.Host.HealthChecks; using JiShe.CollectBus.Migration.Host.Swaggers; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.DataProtection; @@ -12,7 +9,6 @@ using StackExchange.Redis; using System.Text; using Volo.Abp.AspNetCore.Auditing; using Volo.Abp.Auditing; -using Volo.Abp.BackgroundJobs; using Volo.Abp.Caching; using Volo.Abp.Modularity; @@ -21,33 +17,6 @@ namespace JiShe.CollectBus.Migration.Host { public partial class CollectBusMigrationHostModule { - /// - /// Configures the hangfire. - /// - /// The context. - //private void ConfigureHangfire(ServiceConfigurationContext context) - //{ - // var redisStorageOptions = new RedisStorageOptions() - // { - // Db = context.Services.GetConfiguration().GetValue("Redis:HangfireDB") - // }; - - // Configure(options => { options.IsJobExecutionEnabled = false; }); - - // context.Services.AddHangfire(config => - // { - // config.UseRedisStorage( - // context.Services.GetConfiguration().GetValue("Redis:Configuration"), redisStorageOptions) - // .WithJobExpirationTimeout(TimeSpan.FromDays(7)); - // var delaysInSeconds = new[] { 10, 60, 60 * 3 }; // 重试时间间隔 - // const int Attempts = 3; // 重试次数 - // config.UseFilter(new AutomaticRetryAttribute() { Attempts = Attempts, DelaysInSeconds = delaysInSeconds }); - // //config.UseFilter(new AutoDeleteAfterSuccessAttribute(TimeSpan.FromDays(7))); - // config.UseFilter(new JobRetryLastFilter(Attempts)); - // }); - // context.Services.AddHangfireServer(); - //} - /// /// Configures the JWT authentication. /// diff --git a/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs b/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs index 0f90aa7..c181f60 100644 --- a/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs +++ b/web/JiShe.CollectBus.Migration.Host/CollectBusMigrationHostModule.cs @@ -42,7 +42,6 @@ namespace JiShe.CollectBus.Migration.Host ConfigureSwaggerServices(context, configuration); //ConfigureNetwork(context, configuration); ConfigureJwtAuthentication(context, configuration); - //ConfigureHangfire(context); ConfigureAuditLog(context); ConfigureCustom(context, configuration); ConfigureHealthChecks(context, configuration); @@ -84,9 +83,9 @@ namespace JiShe.CollectBus.Migration.Host options.DefaultModelsExpandDepth(-1); }); } - //app.UseAuditing(); + app.UseAuditing(); app.UseAbpSerilogEnrichers(); - app.UseUnitOfWork(); + //app.UseUnitOfWork(); //app.UseHangfireDashboard("/hangfire", new DashboardOptions //{ // IgnoreAntiforgeryToken = true diff --git a/web/JiShe.CollectBus.Migration.Host/Hangfire/JobRetryLastFilter.cs b/web/JiShe.CollectBus.Migration.Host/Hangfire/JobRetryLastFilter.cs deleted file mode 100644 index 83d68ae..0000000 --- a/web/JiShe.CollectBus.Migration.Host/Hangfire/JobRetryLastFilter.cs +++ /dev/null @@ -1,29 +0,0 @@ -using Hangfire.Common; -using Hangfire.States; -using Serilog; - -namespace JiShe.CollectBus.Migration.Host.Hangfire -{ - /// - /// 重试最后一次 - /// - public class JobRetryLastFilter : JobFilterAttribute, IElectStateFilter - { - private int RetryCount { get; } - - public JobRetryLastFilter(int retryCount) - { - RetryCount = retryCount; - } - - - public void OnStateElection(ElectStateContext context) - { - var retryAttempt = context.GetJobParameter("RetryCount"); - if (RetryCount == retryAttempt) - { - Log.Error("最后一次重试"); - } - } - } -} diff --git a/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj b/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj index d201582..747a5c8 100644 --- a/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj +++ b/web/JiShe.CollectBus.Migration.Host/JiShe.CollectBus.Migration.Host.csproj @@ -46,6 +46,8 @@ + +