完善数据通道数据处理,解决数据丢失的问题。

This commit is contained in:
ChenYi 2025-05-08 10:28:23 +08:00
parent ac226110cd
commit edecbc386e
9 changed files with 121 additions and 54 deletions

View File

@ -490,21 +490,20 @@ namespace JiShe.CollectBus.IncrementalGenerator
var initializerLines = new List<string>(); var initializerLines = new List<string>();
foreach (var prop in propList) foreach (var prop in propList)
{ {
// 主属性
AddPropertyInitializer(classSymbol, prop, initializerLines); AddPropertyInitializer(classSymbol, prop, initializerLines);
// 处理元组嵌套属性
if (prop.Type is INamedTypeSymbol { IsTupleType: true } tupleType) if (prop.Type is INamedTypeSymbol { IsTupleType: true } tupleType)
{ {
foreach (var element in tupleType.TupleElements) foreach (var element in tupleType.TupleElements)
{ {
// 生成形如typeof(ValueTuple<string,T>).GetProperty("Item1") //使用GetField代替GetProperty
var tupleTypeName = tupleType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat); var tupleTypeName = tupleType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
initializerLines.Add( initializerLines.Add(
$"typeof({tupleTypeName}).GetProperty(\"{element.Name}\") ?? " + $"typeof({tupleTypeName}).GetField(\"{element.Name}\") ?? " +
$"throw new InvalidOperationException(\"Tuple element {element.Name} not found\")"); $"throw new InvalidOperationException(\"Tuple field {element.Name} not found\")");
} }
} }
} }

View File

@ -179,14 +179,14 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
var accessor = SourceEntityAccessorFactory.GetAccessor<T>(); var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var columns = CollectColumnMetadata(typeof(T)); var columns = CollectColumnMetadata<T>(accessor);
var metadata = BuildDeviceMetadata<T>(columns); var metadata = BuildDeviceMetadata<T>(columns);
var metaData = MetadataCache.AddOrUpdate( var metaData = MetadataCache.AddOrUpdate(
typeof(T), typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加 addValueFactory: t => metadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) => updateValueFactory: (t, existingValue) =>
{ {
var columns = CollectColumnMetadata(t); var columns = CollectColumnMetadata(accessor);
var metadata = BuildDeviceMetadata<T>(columns); var metadata = BuildDeviceMetadata<T>(columns);
//对现有值 existingValue 进行修改,返回新值 //对现有值 existingValue 进行修改,返回新值
@ -300,7 +300,23 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var measurement in tempColumnNames) foreach (var measurement in tempColumnNames)
{ {
rowValues.Add(accessor.GetPropertyValue(entity,measurement)); var value = accessor.GetPropertyValue(entity, measurement);
if (value != null)
{
Type tupleType = value.GetType();
var tempValue = tupleType.Name.ToUpper() switch
{
"DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value
};
rowValues.Add(tempValue);
}
else
{
rowValues.Add(value);
}
} }
//foreach (var measurement in tempColumnNames) //foreach (var measurement in tempColumnNames)
@ -613,15 +629,14 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary> /// <summary>
/// 获取设备元数据的列 /// 获取设备元数据的列
/// </summary> /// </summary>
/// <param name="type"></param> /// <param name="accessor"></param>
/// <returns></returns> /// <returns></returns>
private List<ColumnInfo> CollectColumnMetadata(Type type) private List<ColumnInfo> CollectColumnMetadata<T>(ISourceEntityAccessor<T> accessor)
{ {
var columns = new List<ColumnInfo>(); var columns = new List<ColumnInfo>();
foreach (var prop in type.GetProperties()) foreach (var prop in accessor.PropertyInfoList)
{ {
string typeName = string.Empty; string typeName = string.Empty;
Type declaredType = prop.PropertyType; Type declaredType = prop.PropertyType;

View File

@ -21,14 +21,13 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入 /// 定时任务数据通道写入
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems); Task ScheduledMeterTaskWriterAsync(ChannelWriter<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, ValueTuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems);
/// <summary> /// <summary>
/// 定时任务数据入库和Kafka推送通道 /// 定时任务数据入库和Kafka推送通道
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task ScheduledMeterTaskReadingAsync(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader, Task ScheduledMeterTaskReadingAsync(ChannelReader<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader );
CancellationToken cancellationToken);
#endregion #endregion
} }
} }

View File

@ -81,7 +81,7 @@ public class CollectBusApplicationModule : AbpModule
//}).ConfigureAwait(false); //}).ConfigureAwait(false);
//下发任务通道构建 //下发任务通道构建
DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<Tuple<string, List<MeterReadingTelemetryPacketInfo>>>(); DataChannelManage.TaskDataChannel = Channel.CreateUnbounded<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>(); var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();

View File

@ -13,6 +13,6 @@ namespace JiShe.CollectBus.DataChannels
/// <summary> /// <summary>
/// 下发任务通道 /// 下发任务通道
/// </summary> /// </summary>
public static Channel<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> TaskDataChannel; public static Channel<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> TaskDataChannel;
} }
} }

View File

@ -50,7 +50,7 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入 /// 定时任务数据通道写入
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public async Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems) public async Task ScheduledMeterTaskWriterAsync(ChannelWriter<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, ValueTuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
{ {
await _telemetryPacketInfoWriter.WriteAsync(dataItems); await _telemetryPacketInfoWriter.WriteAsync(dataItems);
} }
@ -61,47 +61,62 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据入库和Kafka推送通道 /// 定时任务数据入库和Kafka推送通道
/// </summary> /// </summary>
public async Task ScheduledMeterTaskReadingAsync( public async Task ScheduledMeterTaskReadingAsync(
ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader, ChannelReader<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader)
CancellationToken cancellationToken = default)
{ {
const int BatchSize = 20000; // 修正批次大小 const int BatchSize = 20000;
const int EmptyWaitMilliseconds = 1000; const int EmptyWaitMilliseconds = 1000;
var timeout = TimeSpan.FromSeconds(5); var timeout = TimeSpan.FromSeconds(5);
var timer = Stopwatch.StartNew();
long timeoutMilliseconds = 0;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>(); var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
try try
{ {
while (!cancellationToken.IsCancellationRequested) while (true)
{ {
var batchStopwatch = Stopwatch.StartNew(); var batch = new List<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
var batch = new List<Tuple<string, List<MeterReadingTelemetryPacketInfo>>>(); var canRead = telemetryPacketInfoReader.Count;
if (canRead <= 0)
{
if (timeoutMilliseconds > 0)
{
_logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
}
timeoutMilliseconds = 0;
//无消息时短等待1秒
await Task.Delay(EmptyWaitMilliseconds);
continue;
}
timer.Restart();
var startTime = DateTime.Now;
try try
{ {
// 异步批量读取数据 // 异步批量读取数据
while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout) while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout)
{ {
while (telemetryPacketInfoReader.TryRead(out var data)) try
{ {
batch.Add(data); if (telemetryPacketInfoReader.TryRead(out var dataItem))
if (batch.Count >= BatchSize) break; {
batch.Add(dataItem);
}
}
catch (Exception)
{
throw;
} }
if (batch.Count >= BatchSize) break;
// 无更多数据时等待
if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken))
break;
} }
} }
catch (OperationCanceledException) catch (Exception)
{ {
break; throw;
} }
if (batch.Count == 0) if (batch.Count == 0)
{ {
await Task.Delay(EmptyWaitMilliseconds, cancellationToken); await Task.Delay(EmptyWaitMilliseconds);
continue; continue;
} }
@ -135,12 +150,16 @@ namespace JiShe.CollectBus.DataChannels
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName); _logger.LogError(ex, "数据通道处理主题 {TopicName} 数据时发生异常", topicName);
} }
} }
_logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms", batch.Clear();
batch.Count, batchStopwatch.ElapsedMilliseconds); timer.Stop();
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
startTime = DateTime.Now;
} }
} }
catch (Exception ex) catch (Exception ex)

View File

@ -175,7 +175,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time) public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time)
{ {
time = DateTime.Now; time = DateTime.Now;
//System.Reflection.PropertyInfo;
//System.Reflection.FieldInfo
var meter = new TreeModelSingleMeasuringEntity<string>() var meter = new TreeModelSingleMeasuringEntity<string>()
{ {
SystemName = "energy", SystemName = "energy",

View File

@ -160,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
@ -177,7 +177,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
@ -194,7 +194,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
@ -211,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结 else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
@ -228,7 +228,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
}); });
} }
else else
@ -262,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); //_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask));
}); });
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@ -281,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}"); _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
}); });
} }
else else
@ -307,7 +307,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务"); _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
return; return;
} }
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
} }
#region #region
@ -329,10 +329,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
// 创建取消令牌源 return;
var cts = new CancellationTokenSource();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader, cts.Token); // 创建取消令牌源
//var cts = new CancellationTokenSource();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader );
//此处代码不要删除 //此处代码不要删除
#if DEBUG #if DEBUG

View File

@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace JiShe.CollectBus.Analyzers.Shared
{
/// <summary>
/// 实体成员信息
/// </summary>
public sealed class EntityMemberInfo
{
public string Path { get; set; }
public Type Type { get; set; }
private readonly Func<object, object> _getter;
private readonly Action<object, object> _setter;
public EntityMemberInfo(
string path,
Type type,
Func<object, object> getter,
Action<object, object> setter)
{
Path = path;
Type = type;
_getter = getter;
_setter = setter;
}
public object GetValue(object entity) => _getter(entity);
public void SetValue(object entity, object value) => _setter(entity, value);
}
}