From edecbc386e178815b359675b761f24b152fd68dc Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 8 May 2025 10:28:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=95=B0=E6=8D=AE=E9=80=9A?= =?UTF-8?q?=E9=81=93=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E5=86=B3=E6=95=B0=E6=8D=AE=E4=B8=A2=E5=A4=B1=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ComplexTypeSourceAnalyzers.cs | 9 ++- .../Provider/IoTDBProvider.cs | 31 ++++++--- .../DataChannels/IDataChannelManageService.cs | 5 +- .../CollectBusApplicationModule.cs | 2 +- .../DataChannels/DataChannelManage.cs | 2 +- .../DataChannels/DataChannelManageService.cs | 67 ++++++++++++------- .../Samples/SampleAppService.cs | 3 +- .../BasicScheduledMeterReadingService.cs | 24 ++++--- .../EntityMemberInfo.cs | 32 +++++++++ 9 files changed, 121 insertions(+), 54 deletions(-) create mode 100644 shared/JiShe.CollectBus.Analyzers.Shared/EntityMemberInfo.cs diff --git a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs index 3b82bf2..befb833 100644 --- a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs +++ b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs @@ -490,21 +490,20 @@ namespace JiShe.CollectBus.IncrementalGenerator var initializerLines = new List(); + foreach (var prop in propList) { - // 主属性 AddPropertyInitializer(classSymbol, prop, initializerLines); - // 处理元组嵌套属性 if (prop.Type is INamedTypeSymbol { IsTupleType: true } tupleType) { foreach (var element in tupleType.TupleElements) { - // 生成形如:typeof(ValueTuple).GetProperty("Item1") + //使用GetField代替GetProperty var tupleTypeName = tupleType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat); initializerLines.Add( - $"typeof({tupleTypeName}).GetProperty(\"{element.Name}\") ?? " + - $"throw new InvalidOperationException(\"Tuple element {element.Name} not found\")"); + $"typeof({tupleTypeName}).GetField(\"{element.Name}\") ?? " + + $"throw new InvalidOperationException(\"Tuple field {element.Name} not found\")"); } } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 704363c..83040fe 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -179,14 +179,14 @@ namespace JiShe.CollectBus.IoTDB.Provider { var accessor = SourceEntityAccessorFactory.GetAccessor(); - var columns = CollectColumnMetadata(typeof(T)); + var columns = CollectColumnMetadata(accessor); var metadata = BuildDeviceMetadata(columns); var metaData = MetadataCache.AddOrUpdate( typeof(T), addValueFactory: t => metadata, // 如果键不存在,用此值添加 updateValueFactory: (t, existingValue) => { - var columns = CollectColumnMetadata(t); + var columns = CollectColumnMetadata(accessor); var metadata = BuildDeviceMetadata(columns); //对现有值 existingValue 进行修改,返回新值 @@ -300,7 +300,23 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var measurement in tempColumnNames) { - rowValues.Add(accessor.GetPropertyValue(entity,measurement)); + var value = accessor.GetPropertyValue(entity, measurement); + + if (value != null) + { + Type tupleType = value.GetType(); + var tempValue = tupleType.Name.ToUpper() switch + { + "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + _ => value + }; + + rowValues.Add(tempValue); + } + else + { + rowValues.Add(value); + } } //foreach (var measurement in tempColumnNames) @@ -613,15 +629,14 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 获取设备元数据的列 /// - /// + /// /// - private List CollectColumnMetadata(Type type) + private List CollectColumnMetadata(ISourceEntityAccessor accessor) { var columns = new List(); - - foreach (var prop in type.GetProperties()) + + foreach (var prop in accessor.PropertyInfoList) { - string typeName = string.Empty; Type declaredType = prop.PropertyType; diff --git a/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs b/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs index 2b1d8b7..a126910 100644 --- a/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs @@ -21,14 +21,13 @@ namespace JiShe.CollectBus.DataChannels /// 定时任务数据通道写入 /// /// - Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems); + Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, ValueTuple> dataItems); /// /// 定时任务数据入库和Kafka推送通道 /// /// - Task ScheduledMeterTaskReadingAsync(ChannelReader>> _telemetryPacketInfoReader, - CancellationToken cancellationToken); + Task ScheduledMeterTaskReadingAsync(ChannelReader>> _telemetryPacketInfoReader ); #endregion } } diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 592d7e5..ed86fd7 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -81,7 +81,7 @@ public class CollectBusApplicationModule : AbpModule //}).ConfigureAwait(false); //下发任务通道构建 - DataChannelManage.TaskDataChannel = Channel.CreateUnbounded>>(); + DataChannelManage.TaskDataChannel = Channel.CreateUnbounded>>(); //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManage.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManage.cs index e1f3979..6a8eeed 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManage.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManage.cs @@ -13,6 +13,6 @@ namespace JiShe.CollectBus.DataChannels /// /// 下发任务通道 /// - public static Channel>> TaskDataChannel; + public static Channel>> TaskDataChannel; } } diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index 26208a2..d6f9e04 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -50,7 +50,7 @@ namespace JiShe.CollectBus.DataChannels /// 定时任务数据通道写入 /// /// - public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems) + public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, ValueTuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } @@ -61,47 +61,62 @@ namespace JiShe.CollectBus.DataChannels /// 定时任务数据入库和Kafka推送通道 /// public async Task ScheduledMeterTaskReadingAsync( - ChannelReader>> telemetryPacketInfoReader, - CancellationToken cancellationToken = default) + ChannelReader>> telemetryPacketInfoReader) { - const int BatchSize = 20000; // 修正批次大小 + const int BatchSize = 20000; const int EmptyWaitMilliseconds = 1000; var timeout = TimeSpan.FromSeconds(5); + var timer = Stopwatch.StartNew(); + long timeoutMilliseconds = 0; var metadata = await _dbProvider.GetMetadata(); try { - while (!cancellationToken.IsCancellationRequested) + while (true) { - var batchStopwatch = Stopwatch.StartNew(); - var batch = new List>>(); + var batch = new List>>(); + var canRead = telemetryPacketInfoReader.Count; + if (canRead <= 0) + { + if (timeoutMilliseconds > 0) + { + _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); + } + timeoutMilliseconds = 0; + //无消息时短等待1秒 + await Task.Delay(EmptyWaitMilliseconds); + continue; + } + + timer.Restart(); + var startTime = DateTime.Now; try { // 异步批量读取数据 - while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout) + while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) { - while (telemetryPacketInfoReader.TryRead(out var data)) + try { - batch.Add(data); - if (batch.Count >= BatchSize) break; + if (telemetryPacketInfoReader.TryRead(out var dataItem)) + { + batch.Add(dataItem); + } + } + catch (Exception) + { + throw; } - - if (batch.Count >= BatchSize) break; - - // 无更多数据时等待 - if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken)) - break; } } - catch (OperationCanceledException) + catch (Exception) { - break; + throw; } if (batch.Count == 0) { - await Task.Delay(EmptyWaitMilliseconds, cancellationToken); + await Task.Delay(EmptyWaitMilliseconds); continue; } @@ -135,12 +150,16 @@ namespace JiShe.CollectBus.DataChannels } catch (Exception ex) { - _logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName); + _logger.LogError(ex, "数据通道处理主题 {TopicName} 数据时发生异常", topicName); } } - _logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms", - batch.Count, batchStopwatch.ElapsedMilliseconds); + batch.Clear(); + timer.Stop(); + + timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; + + startTime = DateTime.Now; } } catch (Exception ex) @@ -179,7 +198,7 @@ namespace JiShe.CollectBus.DataChannels } } } - + } } diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index ef3b894..17c8a6e 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -175,7 +175,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time) { time = DateTime.Now; - + //System.Reflection.PropertyInfo; + //System.Reflection.FieldInfo var meter = new TreeModelSingleMeasuringEntity() { SystemName = "energy", diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 47b8a99..32a1d7c 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -160,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取 @@ -177,7 +177,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取 @@ -194,7 +194,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结 @@ -211,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结 @@ -228,7 +228,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else @@ -262,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask)); }); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) @@ -281,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask)); }); } else @@ -307,7 +307,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); } #region 电表采集处理 @@ -329,10 +329,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - // 创建取消令牌源 - var cts = new CancellationTokenSource(); + return; - _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader, cts.Token); + // 创建取消令牌源 + //var cts = new CancellationTokenSource(); + + _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader ); //此处代码不要删除 #if DEBUG diff --git a/shared/JiShe.CollectBus.Analyzers.Shared/EntityMemberInfo.cs b/shared/JiShe.CollectBus.Analyzers.Shared/EntityMemberInfo.cs new file mode 100644 index 0000000..a45f4c2 --- /dev/null +++ b/shared/JiShe.CollectBus.Analyzers.Shared/EntityMemberInfo.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JiShe.CollectBus.Analyzers.Shared +{ + /// + /// 实体成员信息 + /// + public sealed class EntityMemberInfo + { + public string Path { get; set; } + public Type Type { get; set; } + private readonly Func _getter; + private readonly Action _setter; + + public EntityMemberInfo( + string path, + Type type, + Func getter, + Action setter) + { + Path = path; + Type = type; + _getter = getter; + _setter = setter; + } + + public object GetValue(object entity) => _getter(entity); + public void SetValue(object entity, object value) => _setter(entity, value); + } +}