解决冲突

This commit is contained in:
zenghongyao 2025-05-16 09:25:38 +08:00
commit 99e19428a1
12 changed files with 226 additions and 122 deletions

View File

@ -577,19 +577,43 @@ namespace JiShe.CollectBus.IncrementalGenerator
var elementDeclaredName = element.Type.Name;//元组元素类型名称
initializerLines.Add(
$"new EntityMemberInfo(" +
$"\"{prop.Name}.{elementName}\", " +
$"typeof({elementType}), " +
$"typeof({elementType}).Name, " +//$"\"{elementDeclaredName}\", " +
$"(e) => Get{prop.Name}_{elementName}(({entityType})e), " +
$"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))");
}
}
}
$"new EntityMemberInfo(" +
$"\"{prop.Name}.{elementName}\", " +
$"typeof({elementType}), " +
$"GetValueTupleElementName(typeof({elementType})), " +//$"\"{elementDeclaredName}\", " +
$"(e) => Get{prop.Name}_{elementName}(({entityType})e), " +
$"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))");
}
}
}
code.AppendLine(string.Join(",\n", initializerLines));
code.AppendLine(" };");
}
code.AppendLine(string.Join(",\n", initializerLines));
code.AppendLine(" };");
code.AppendLine(GetValueTupleElementName());
}
private static string GetValueTupleElementName()
{
return """
public static string GetValueTupleElementName(Type declaredType)
{
string typeName;
// 处理可空类型
if (declaredType.IsGenericType && declaredType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
Type underlyingType = Nullable.GetUnderlyingType(declaredType);
typeName = underlyingType.Name;
}
else
{
typeName = declaredType.Name;
}
return typeName;
}
""";
}
private static string GenerateAttributeInitializer(AttributeData attribute)

View File

@ -7,7 +7,7 @@ namespace JiShe.CollectBus.IoTDB.Context
/// <summary>
/// IoTDB SessionPool 运行时上下文
/// </summary>
public class IoTDBRuntimeContext: IScopedDependency
public class IoTDBRuntimeContext: IScopedDependency//ITransientDependency
{
private readonly bool _defaultValue;

View File

@ -16,6 +16,8 @@ namespace JiShe.CollectBus.IoTDB.Interface
///// <param name="useTableSession">是否使用表模型</param>
//void SwitchSessionPool(bool useTableSession);
IIoTDbProvider GetSessionPool(bool sessionpolType);
/// <summary>
/// 插入数据
/// </summary>

View File

@ -42,6 +42,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
private IIoTDbSessionPool CurrentSession =>
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
//private IIoTDbSessionPool CurrentSession { get; set; }
public IIoTDbProvider GetSessionPool(bool sessionpolType)
{
//CurrentSession = _sessionFactory.GetSessionPool(sessionpolType);
return this;
}
/// <summary>
/// IoTDbProvider
/// </summary>
@ -96,6 +106,11 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
try
{
if (entities == null || entities.Count() <= 0)
{
_logger.LogError($"{nameof(BatchInsertAsync)} 参数异常,-101");
return;
}
var metadata = await GetMetadata<T>();
var batchSize = 1000;
@ -320,10 +335,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
return null;
}
//var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
//var memberCache = BuildMemberCache(accessor);
if (metadata.EntityType == null)
{
@ -795,7 +807,8 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
return declaredTypeName switch
{
"DATETIME" => value => value != null ? ((DateTime)value).GetDateTimeOffset().ToUnixTimeNanoseconds() : null,
"DATETIME" => value => value != null ? Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds() : null,
"DECIMAL" => value => value != null ? Convert.ToDouble( value) : null,
_ => value => value
};
}
@ -897,7 +910,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
["DATETIME"] = TSDataType.TIMESTAMP,
["DATE"] = TSDataType.DATE,
["BLOB"] = TSDataType.BLOB,
["DECIMAL"] = TSDataType.FLOAT,
["DECIMAL"] = TSDataType.DOUBLE,
["STRING"] = TSDataType.STRING
};
@ -933,7 +946,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
TSDataType.BOOLEAN => Convert.ToBoolean(value),
TSDataType.INT32 => Convert.ToInt32(value),
TSDataType.INT64 => Convert.ToInt64(value),
TSDataType.FLOAT => Convert.ToDouble(value),
TSDataType.FLOAT => Convert.ToSingle(value),
TSDataType.DOUBLE => Convert.ToDouble(value),
TSDataType.TEXT => Convert.ToString(value),
TSDataType.NONE => null,

View File

@ -98,6 +98,29 @@ namespace JiShe.CollectBus.Protocol.T6452007.SendData
var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry6452007PacketResponse() { Data = dataList };
}
#endregion
#endregion
#region
/// <summary>
/// 变量数据标识编码处理
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public static Telemetry6452007PacketResponse C11_02_Send(Telemetry6452007PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');//11_02_80_00_02
var c_data = itemCodeArr[0];
var DI3 = itemCodeArr[1];
var DI2 = itemCodeArr[2];
var DI1 = itemCodeArr[3];
var DI0 = itemCodeArr[4];
var dataUnit = new List<string>() { DI3, DI2, DI1, DI0 };
var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit);
return new Telemetry6452007PacketResponse() { Data = dataList };
}
#endregion
}
}

View File

@ -55,8 +55,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
_dbContext = dbContext;
_logger = logger;
_redisDataCacheService = redisDataCacheService;
_producerService =producerService;
_tcpService=tcpService;
_producerService = producerService;
_tcpService = tcpService;
}
/// <summary>
@ -91,7 +91,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
//TableModelSingleMeasuringEntityExtension
//TableModelSingleMeasuringEntityAccessor.GetSystemName(meter);
//ElectricityMeterAccessor
await _iotDBProvider.InsertAsync(meter);
await _iotDBProvider.GetSessionPool(true).InsertAsync(meter);
await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
@ -101,7 +102,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[HttpGet]
public async Task UseTableSessionPool(DateTime time)
{
var testTime = time;
var testTime = time;
ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel()
{
SystemName = "energy",
@ -177,7 +178,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await _iotDBProvider.InsertAsync(meter2);
_dbContext.UseTableSessionPool = true;
ElectricityMeter meter3 = new ElectricityMeter()
{
@ -211,14 +212,15 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
time = DateTime.Now;
//System.Reflection.PropertyInfo;
//System.Reflection.FieldInfo
var meter = new TreeModelSingleMeasuringEntity<DateTime>()
//TreeModelSingleMeasuringEntityAccessor
var meter = new TreeModelSingleMeasuringEntity<decimal?>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "1",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = (measuring, time)
SingleMeasuring = (measuring, 34.534m)
};
await _iotDBProvider.InsertAsync(meter);
}
@ -264,7 +266,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = ("measuring", true)
};
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
@ -273,13 +275,13 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
};
var query = new IoTDBQueryOptions()
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = meter.DevicePath,
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
Conditions = new List<QueryCondition>() { conditions },
};
var pageResult = await _iotDBProvider.QueryAsync<DeviceTreeModelDataInfo>(query);
@ -391,7 +393,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
//}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//// 打印分布统计
//DeviceGroupBalanceControl.PrintDistributionStats();
@ -405,13 +407,13 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
/// <returns></returns>
[HttpGet]
public async Task TestGetDeviceGroupBalanceControl(string deviceAddress)
{
{
var groupId = DeviceGroupBalanceControl.GetDeviceGroupId(deviceAddress);
Console.WriteLine(groupId);
await Task.CompletedTask;
}
/// <summary>
/// 测试Redis批量读取10万条数据性能
@ -474,7 +476,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
{
DateTime nextTaskTime = Convert.ToDateTime(time);
return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity));
return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity));
}
@ -510,7 +512,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[AllowAnonymous]
public bool GetTestProtocol()
{
var aa = LazyServiceProvider.GetKeyedService<IProtocolPlugin>("TestProtocolPlugin");
var aa = LazyServiceProvider.GetKeyedService<IProtocolPlugin>("TestProtocolPlugin");
return aa == null;
}
@ -519,7 +521,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{
_logger.LogWarning($"收到订阅消息: {obj}");
return SubscribeAck.Success();
return SubscribeAck.Success();
}
/// <summary>
@ -530,7 +532,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
[AllowAnonymous]
public async Task<bool> KafkaSendAsync(KafkaSendDto input)
{
ArgumentException.ThrowIfNullOrWhiteSpace(input.Address);
ArgumentException.ThrowIfNullOrWhiteSpace(input.Address);
ArgumentException.ThrowIfNullOrWhiteSpace(input.Frame);
input.Frame = input.Frame.Replace(" ", "");
await _producerService.ProduceAsync<KafkaSendDto>(ProtocolConst.TESTSENDTOPIC, input);
@ -557,7 +559,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
{
_logger.LogWarning($"{dto.Address}集中器未上线: {dto.Serialize()}");
}
// 测试不管是否上线都ACK
// 测试不管是否上线都ACK
return SubscribeAck.Success();
}

View File

@ -45,7 +45,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly DataMigrationOptions _dataMigrationOptions;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions;
private readonly IGuidGenerator _guidGenerator;
private readonly IGuidGenerator _guidGenerator;
int pageSize = 10000;
@ -328,7 +328,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
//return;
//return;
try
{
@ -337,34 +337,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
// //此处代码不要删除
//#if DEBUG
// var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus2:DeviceInfo";
// //此处代码不要删除
//#if DEBUG
// var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus2:DeviceInfo";
// var timer1 = Stopwatch.StartNew();
// Dictionary<string, List<DeviceInfo>> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);
// List<DeviceInfo> meterInfos = new List<DeviceInfo>();
// List<string> focusAddressDataLista = new List<string>();
// foreach (var item in keyValuePairsTemps)
// {
// foreach (var subItem in item.Value)
// {
// if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
// {
// meterInfos.Add(subItem);
// focusAddressDataLista.Add(subItem.MeterId.ToString());
// }
// }
// }
// var timer1 = Stopwatch.StartNew();
// Dictionary<string, List<DeviceInfo>> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);
// List<DeviceInfo> meterInfos = new List<DeviceInfo>();
// List<string> focusAddressDataLista = new List<string>();
// foreach (var item in keyValuePairsTemps)
// {
// foreach (var subItem in item.Value)
// {
// if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
// {
// meterInfos.Add(subItem);
// focusAddressDataLista.Add(subItem.MeterId.ToString());
// }
// }
// }
// timer1.Stop();
// _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
// return;
//#else
// var meterInfos = await GetAmmeterInfoList(gatherCode);
//#endif
// timer1.Stop();
// _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
// return;
//#else
// var meterInfos = await GetAmmeterInfoList(gatherCode);
//#endif
var meterInfos = await GetAmmeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0)
{
@ -728,17 +728,27 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(tempItem);
//TODO:特殊表
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = tempItem,
ItemCode = itemCodeInfo.Item1,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
DataTime = timestamps,
},
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password,
MeteringPort = ammeterInfo.MeteringPort,
Baudrate = ammeterInfo.Baudrate,
ItemCode = itemCodeInfo.Item2, //10_97 => 11_02_80_00_02
}
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
@ -816,7 +826,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
@ -1106,7 +1116,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.Select(d=>d.TimeDensity).GroupBy(d => d);
var meterInfoGroupByTimeDensity = meterInfos.Select(d => d.TimeDensity).GroupBy(d => d);
var currentTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
@ -1153,7 +1163,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _redisDataCacheService.BatchInsertDataAsync<DeviceInfo>(
await _redisDataCacheService.BatchInsertDataAsync<DeviceInfo>(
redisCacheDeviceGroupSetIndexKey,
redisCacheDeviceInfoHashKey,
keyValuePairs);
@ -1546,7 +1556,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//}
Dictionary<string, List<T>> keyValuePairs = FreeRedisProvider.Instance.HGetAll<List<T>>(redisCacheDeviceInfoHashKey);
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
@ -1557,7 +1567,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
foreach (var subItem in item.Value)
{
if (subItem.MeterType == meterType && subItem.TimeDensity == timeDensity)
if (subItem.MeterType == meterType && subItem.TimeDensity == timeDensity)
{
meterInfos.Add(subItem);
}
@ -1691,10 +1701,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
};
}
catch (Exception ex)
{
{
throw ex;
}
}
}
#endregion
}

View File

@ -222,7 +222,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
#if DEBUG
sql = $@"{sql} and c.Address in('542410000504','442405000040','442405000039','402410040506')";
// sql = $@"{sql} and c.Address in('542410000504','442405000040','442405000039','402410040506')";
sql = $@"{sql} and c.Address in('402410040506')";
#endif
if (!string.IsNullOrWhiteSpace(gatherCode))
@ -294,35 +295,49 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (settingInfos == null || settingInfos.Count <= 0)
{
settingInfos = new List<AmmeterAutoValveControlSetting>();
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000040",
FocusAddress = "442400040",
FocusId = 57683,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 78971,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "442405000039",
FocusAddress = "442400039",
FocusId = 57684,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 78972,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
//settingInfos.Add(new AmmeterAutoValveControlSetting()
//{
// MeterType = MeterTypeEnum.Ammeter,
// AmmerterAddress = "442405000040",
// FocusAddress = "442400040",
// FocusId = 57683,
// ProjectID = 1,
// TripType = "on",
// TripTime = $"{DateTime.Now:HH:mm}",
// MeterId = 78971,
// LoopType = "EachDay",
// EachDayWithout = "周六,周日",
// TimeDensity = 15,
//});
//settingInfos.Add(new AmmeterAutoValveControlSetting()
//{
// MeterType = MeterTypeEnum.Ammeter,
// AmmerterAddress = "442405000039",
// FocusAddress = "442400039",
// FocusId = 57684,
// ProjectID = 1,
// TripType = "on",
// TripTime = $"{DateTime.Now:HH:mm}",
// MeterId = 78972,
// LoopType = "EachDay",
// EachDayWithout = "周六,周日",
// TimeDensity = 15,
//});
//settingInfos.Add(new AmmeterAutoValveControlSetting()
//{
// MeterType = MeterTypeEnum.Ammeter,
// AmmerterAddress = "542410000504",
// FocusAddress = "542400504",
// FocusId = 57686,
// ProjectID = 1,
// TripType = "on",
// TripTime = $"{DateTime.Now:HH:mm}",
// MeterId = 78974,
// LoopType = "EachDay",
// EachDayWithout = "周六,周日",
// TimeDensity = 15,
//});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
@ -339,21 +354,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = 15,
});
settingInfos.Add(new AmmeterAutoValveControlSetting()
{
MeterType = MeterTypeEnum.Ammeter,
AmmerterAddress = "542410000504",
FocusAddress = "542400504",
FocusId = 57686,
ProjectID = 1,
TripType = "on",
TripTime = $"{DateTime.Now:HH:mm}",
MeterId = 78974,
LoopType = "EachDay",
EachDayWithout = "周六,周日",
TimeDensity = 15,
});
FreeRedisProvider.Instance.Set(redisCacheDeviceSettingInfoHashKey, settingInfos);
}

View File

@ -19,6 +19,12 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary>
public const string AFN10HFN01H = $"10_01";
/// <summary>
/// 电网频率
/// </summary>
public const string AFN10HFN97H = $"10_97";
/// <summary>
/// 读取终端信息
/// </summary>
@ -181,5 +187,19 @@ namespace JiShe.CollectBus.Common.Consts
}
#endregion
/// <summary>
/// 特殊645编码关系映射
/// </summary>
/// <param name="itemCode"></param>
/// <returns></returns>
public static (string,string) MappingItemCodeTo645SubCodeRelationship(string itemCode)
{
return itemCode switch
{
AFN10HFN97H => (AFN10HFN01H,T6452007PacketItemCodeConst.C1102800002),
_=> (itemCode,""),
};
}
}
}

View File

@ -56,6 +56,16 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary>
public const string C08 = "08";
#endregion
#region
/// <summary>
/// 电网频率
/// </summary>
public const string C1102800002 = "11_02_80_00_02";
#endregion
#endregion
}
}

View File

@ -6,7 +6,7 @@
@{
Layout = null;
}
<!DOCTYPE html>
<html lang="en">

View File

@ -141,7 +141,7 @@
}
},
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus99",
"ServerTagName": "JiSheCollectBus8",
"SystemType": "Energy",
"FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00",